From f8c27a85dd8d06c28ea6be02fc404b7d39a6428d Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Thu, 7 Apr 2022 16:12:53 +0530 Subject: [PATCH] Garbage collect with provided retention options. Introduce two new flags to configure the ttl of an artifact and the max no. of files to retain for an artifact. Modify the gc process to consider the options and use timeouts to prevent the controller from hanging. This helps in situations when the SC has already garbage collected the current artifact but the advertised artifact url is still the same, which leads to the server returning a 404. Signed-off-by: Sanskar Jaiswal --- controllers/bucket_controller.go | 15 +- controllers/bucket_controller_test.go | 26 +- controllers/gitrepository_controller.go | 16 +- controllers/gitrepository_controller_test.go | 142 +++++++++++ controllers/helmchart_controller.go | 18 +- controllers/helmchart_controller_test.go | 28 ++- controllers/helmrepository_controller.go | 18 +- controllers/helmrepository_controller_test.go | 21 +- controllers/storage.go | 180 +++++++++++++- controllers/storage_test.go | 233 +++++++++++++++++- controllers/suite_test.go | 8 +- main.go | 50 ++-- tests/fuzz/gitrepository_fuzzer.go | 2 +- 13 files changed, 665 insertions(+), 92 deletions(-) diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index b01236828..9d4a09889 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -632,14 +632,19 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc return nil } if obj.GetArtifact() != nil { - if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { - return &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %s", err), + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of artifacts failed: %w", err), Reason: "GarbageCollectionFailed", } - } else if len(deleted) > 0 { + r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) + return e + } + if len(delFiles) > 0 { r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - "garbage collected old artifacts") + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index 2f432a4bb..c0b12bf90 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -176,7 +176,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { { name: "garbage collects", beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { - revisions := []string{"a", "b", "c"} + revisions := []string{"a", "b", "c", "d"} for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ @@ -186,26 +186,30 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil { return err } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } } testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, - want: sreconcile.ResultSuccess, assertArtifact: &sourcev1.Artifact{ - Path: "/reconcile-storage/c.txt", - Revision: "c", - Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6", - URL: testStorage.Hostname + "/reconcile-storage/c.txt", - Size: int64p(int64(len("c"))), + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, assertPaths: []string{ + "/reconcile-storage/d.txt", "/reconcile-storage/c.txt", "!/reconcile-storage/b.txt", "!/reconcile-storage/a.txt", }, + want: sreconcile.ResultSuccess, }, { name: "notices missing artifact in storage", @@ -237,7 +241,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil { return err } return nil @@ -259,6 +263,10 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) + defer func() { + g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed()) + }() + r := &BucketReconciler{ EventRecorder: record.NewFakeRecorder(32), Storage: testStorage, diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index 2aa0f8589..e04d35c9f 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -708,13 +708,19 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc return nil } if obj.GetArtifact() != nil { - if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { - return &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of artifacts failed: %w", err), + Reason: "GarbageCollectionFailed", } - } else if len(deleted) > 0 { + r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) + return e + } + if len(delFiles) > 0 { r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - "garbage collected old artifacts") + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/gitrepository_controller_test.go b/controllers/gitrepository_controller_test.go index 88fceb7e7..0ae071272 100644 --- a/controllers/gitrepository_controller_test.go +++ b/controllers/gitrepository_controller_test.go @@ -1104,6 +1104,148 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) { } } +func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) { + tests := []struct { + name string + beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error + want sreconcile.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string + }{ + { + name: "garbage collects", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + revisions := []string{"a", "b", "c", "d"} + for n := range revisions { + v := revisions[n] + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), + Revision: v, + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil { + return err + } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), + }, + assertPaths: []string{ + "/reconcile-storage/d.txt", + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", + }, + want: sreconcile.ResultSuccess, + }, + { + name: "notices missing artifact in storage", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: "/reconcile-storage/invalid.txt", + Revision: "e", + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + want: sreconcile.ResultSuccess, + assertPaths: []string{ + "!/reconcile-storage/invalid.txt", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReconcilingCondition, "NoArtifact", "no artifact for resource in storage"), + }, + }, + { + name: "updates hostname on diff from current", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: "/reconcile-storage/hostname.txt", + Revision: "f", + Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80", + URL: "http://outdated.com/reconcile-storage/hostname.txt", + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil { + return err + } + return nil + }, + want: sreconcile.ResultSuccess, + assertPaths: []string{ + "/reconcile-storage/hostname.txt", + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/hostname.txt", + Revision: "f", + Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80", + URL: testStorage.Hostname + "/reconcile-storage/hostname.txt", + Size: int64p(int64(len("file"))), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + defer func() { + g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed()) + }() + + r := &GitRepositoryReconciler{ + EventRecorder: record.NewFakeRecorder(32), + Storage: testStorage, + } + + obj := &sourcev1.GitRepository{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + }, + } + if tt.beforeFunc != nil { + g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) + } + + var c *git.Commit + var as artifactSet + got, err := r.reconcileStorage(context.TODO(), obj, c, &as, "") + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact)) + if tt.assertArtifact != nil && tt.assertArtifact.URL != "" { + g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL)) + } + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + + for _, p := range tt.assertPaths { + absoluteP := filepath.Join(testStorage.BasePath, p) + if !strings.HasPrefix(p, "!") { + g.Expect(absoluteP).To(BeAnExistingFile()) + continue + } + g.Expect(absoluteP).NotTo(BeAnExistingFile()) + } + }) + } +} + func TestGitRepositoryReconciler_reconcileDelete(t *testing.T) { g := NewWithT(t) diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index 894eb99b6..5396d67dc 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -285,6 +285,9 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC // they match the Storage server hostname of current runtime. func (r *HelmChartReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmChart, build *chart.Build) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage + // Abort if it takes more than 5 seconds. + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() _ = r.garbageCollect(ctx, obj) // Determine if the advertised artifact is still in storage @@ -801,14 +804,19 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1. return nil } if obj.GetArtifact() != nil { - if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { - return &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of artifacts failed: %w", err), Reason: "GarbageCollectionFailed", } - } else if len(deleted) > 0 { + r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) + return e + } + if len(delFiles) > 0 { r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - "garbage collected old artifacts") + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/helmchart_controller_test.go b/controllers/helmchart_controller_test.go index 522908c32..1ecddcd8a 100644 --- a/controllers/helmchart_controller_test.go +++ b/controllers/helmchart_controller_test.go @@ -177,7 +177,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { { name: "garbage collects", beforeFunc: func(obj *sourcev1.HelmChart, storage *Storage) error { - revisions := []string{"a", "b", "c"} + revisions := []string{"a", "b", "c", "d"} for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ @@ -187,21 +187,25 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil { return err } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } } testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, assertArtifact: &sourcev1.Artifact{ - Path: "/reconcile-storage/c.txt", - Revision: "c", - Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6", - URL: testStorage.Hostname + "/reconcile-storage/c.txt", - Size: int64p(int64(len("c"))), + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, assertPaths: []string{ + "/reconcile-storage/d.txt", "/reconcile-storage/c.txt", "!/reconcile-storage/b.txt", "!/reconcile-storage/a.txt", @@ -238,7 +242,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil { return err } return nil @@ -260,6 +264,10 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) + defer func() { + g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed()) + }() + r := &HelmChartReconciler{ EventRecorder: record.NewFakeRecorder(32), Storage: testStorage, @@ -303,7 +311,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) defer os.RemoveAll(tmpDir) - storage, err := NewStorage(tmpDir, "example.com", timeout) + storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords) g.Expect(err).ToNot(HaveOccurred()) gitArtifact := &sourcev1.Artifact{ @@ -777,7 +785,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) defer os.RemoveAll(tmpDir) - storage, err := NewStorage(tmpDir, "example.com", timeout) + storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords) g.Expect(err).ToNot(HaveOccurred()) chartsArtifact := &sourcev1.Artifact{ diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index cbad94102..ab6c2a199 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -241,6 +241,9 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1. // they match the Storage server hostname of current runtime. func (r *HelmRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmRepository, _ *sourcev1.Artifact, _ *repository.ChartRepository) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage + // Abort if it takes more than 5 seconds. + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() _ = r.garbageCollect(ctx, obj) // Determine if the advertised artifact is still in storage @@ -515,14 +518,19 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour return nil } if obj.GetArtifact() != nil { - if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { - return &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of artifacts failed: %w", err), Reason: "GarbageCollectionFailed", } - } else if len(deleted) > 0 { + r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) + return e + } + if len(delFiles) > 0 { r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - "garbage collected old artifacts") + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/helmrepository_controller_test.go b/controllers/helmrepository_controller_test.go index 95b770915..a4508d2f0 100644 --- a/controllers/helmrepository_controller_test.go +++ b/controllers/helmrepository_controller_test.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/darkowlzz/controller-check/status" "github.com/fluxcd/pkg/apis/meta" @@ -146,7 +147,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { { name: "garbage collects", beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error { - revisions := []string{"a", "b", "c"} + revisions := []string{"a", "b", "c", "d"} for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ @@ -156,21 +157,25 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil { return err } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } } testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, assertArtifact: &sourcev1.Artifact{ - Path: "/reconcile-storage/c.txt", - Revision: "c", - Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6", - URL: testStorage.Hostname + "/reconcile-storage/c.txt", - Size: int64p(int64(len("c"))), + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, assertPaths: []string{ + "/reconcile-storage/d.txt", "/reconcile-storage/c.txt", "!/reconcile-storage/b.txt", "!/reconcile-storage/a.txt", @@ -207,7 +212,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil { return err } return nil diff --git a/controllers/storage.go b/controllers/storage.go index 55f9a077c..d9358a2b1 100644 --- a/controllers/storage.go +++ b/controllers/storage.go @@ -19,6 +19,7 @@ package controllers import ( "archive/tar" "compress/gzip" + "context" "crypto/sha256" "fmt" "hash" @@ -26,21 +27,28 @@ import ( "net/url" "os" "path/filepath" + "sort" "strings" "time" securejoin "github.com/cyphar/filepath-securejoin" "github.com/go-git/go-git/v5/plumbing/format/gitignore" + kerrors "k8s.io/apimachinery/pkg/util/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/fluxcd/pkg/lockedfile" + "io/fs" + "github.com/fluxcd/pkg/untar" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" - "github.com/fluxcd/source-controller/internal/fs" + sourcefs "github.com/fluxcd/source-controller/internal/fs" "github.com/fluxcd/source-controller/pkg/sourceignore" ) +const GarbageCountLimit = 1000 + // Storage manages artifacts type Storage struct { // BasePath is the local directory path where the source artifacts are stored. @@ -49,19 +57,25 @@ type Storage struct { // Hostname is the file server host name used to compose the artifacts URIs. Hostname string `json:"hostname"` - // Timeout for artifacts operations - Timeout time.Duration `json:"timeout"` + // ArtifactRetentionTTL is the maximum number of artifacts to be kept in storage + // after a garbage collection. + ArtifactRetentionTTL time.Duration `json:"artifactRetentionTTL"` + + // ArtifactRetentionRecords is the duration of time that artifacts will be kept in + // storage before being garbage collected. + ArtifactRetentionRecords int `json:"artifactRetentionRecords"` } // NewStorage creates the storage helper for a given path and hostname. -func NewStorage(basePath string, hostname string, timeout time.Duration) (*Storage, error) { +func NewStorage(basePath string, hostname string, artifactRetentionTTL time.Duration, artifactRetentionRecords int) (*Storage, error) { if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() { return nil, fmt.Errorf("invalid dir path: %s", basePath) } return &Storage{ - BasePath: basePath, - Hostname: hostname, - Timeout: timeout, + BasePath: basePath, + Hostname: hostname, + ArtifactRetentionTTL: artifactRetentionTTL, + ArtifactRetentionRecords: artifactRetentionRecords, }, nil } @@ -145,6 +159,150 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) ([]string, err return deletedFiles, nil } +// getGarbageFiles returns all files that need to be garbage collected for the given artifact. +// Garbage files are determined based on the below flow: +// 1. collect all files with an expired ttl +// 2. if we satisfy maxItemsToBeRetained, then return +// 3. else, remove all files till the latest n files remain, where n=maxItemsToBeRetained +func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) { + localPath := s.LocalPath(artifact) + dir := filepath.Dir(localPath) + garbageFiles := []string{} + filesWithCreatedTs := make(map[time.Time]string) + // sortedPaths contain all files sorted according to their created ts. + sortedPaths := []string{} + now := time.Now().UTC() + totalFiles := 0 + var errors []string + creationTimestamps := []time.Time{} + _ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + errors = append(errors, err.Error()) + return nil + } + if totalFiles >= totalCountLimit { + return fmt.Errorf("Reached file walking limit, already walked over: %d", totalFiles) + } + info, err := d.Info() + if err != nil { + errors = append(errors, err.Error()) + return nil + } + createdAt := info.ModTime().UTC() + diff := now.Sub(createdAt) + // compare the time difference between now and the time at which the file was created + // with the provided ttl. delete if difference is greater than the ttl. + expired := diff > ttl + if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink { + if path != localPath && expired { + garbageFiles = append(garbageFiles, path) + } + totalFiles += 1 + filesWithCreatedTs[createdAt] = path + creationTimestamps = append(creationTimestamps, createdAt) + } + return nil + + }) + if len(errors) > 0 { + return nil, fmt.Errorf("can't walk over file: %s", strings.Join(errors, ",")) + } + + // We already collected enough garbage files to satisfy the no. of max + // items that are supposed to be retained, so exit early. + if totalFiles-len(garbageFiles) < maxItemsToBeRetained { + return garbageFiles, nil + } + + // sort all timestamps in an ascending order. + sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) }) + for _, ts := range creationTimestamps { + path, ok := filesWithCreatedTs[ts] + if !ok { + return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts) + } + sortedPaths = append(sortedPaths, path) + } + + var collected int + noOfGarbageFiles := len(garbageFiles) + for _, path := range sortedPaths { + if path != localPath && !stringInSlice(path, garbageFiles) { + // If we previously collected a few garbage files with an expired ttl, then take that into account + // when checking whether we need to remove more files to satisfy the max no. of items allowed + // in the filesystem, along with the no. of files already removed in this loop. + if noOfGarbageFiles > 0 { + if (len(sortedPaths) - collected - len(garbageFiles)) > maxItemsToBeRetained { + garbageFiles = append(garbageFiles, path) + collected += 1 + } + } else { + if len(sortedPaths)-collected > maxItemsToBeRetained { + garbageFiles = append(garbageFiles, path) + collected += 1 + } + } + } + } + + return garbageFiles, nil +} + +// GarbageCollect removes all garabge files in the artifact dir according to the provided +// retention options. +func (s *Storage) GarbageCollect(ctx context.Context, artifact sourcev1.Artifact, timeout time.Duration) ([]string, error) { + delFilesChan := make(chan []string) + errChan := make(chan error) + // Abort if it takes more than the provided timeout duration. + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + go func() { + garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, s.ArtifactRetentionRecords, s.ArtifactRetentionTTL) + if err != nil { + errChan <- err + return + } + var errors []error + var deleted []string + if len(garbageFiles) > 0 { + for _, file := range garbageFiles { + err := os.Remove(file) + if err != nil { + errors = append(errors, err) + } else { + deleted = append(deleted, file) + } + } + } + if len(errors) > 0 { + errChan <- kerrors.NewAggregate(errors) + return + } + delFilesChan <- deleted + }() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case delFiles := <-delFilesChan: + return delFiles, nil + case err := <-errChan: + return nil, err + } + } +} + +func stringInSlice(a string, list []string) bool { + for _, b := range list { + if b == a { + return true + } + } + return false +} + // ArtifactExist returns a boolean indicating whether the v1beta1.Artifact exists in storage and is a regular file. func (s *Storage) ArtifactExist(artifact sourcev1.Artifact) bool { fi, err := os.Lstat(s.LocalPath(artifact)) @@ -281,7 +439,7 @@ func (s *Storage) Archive(artifact *sourcev1.Artifact, dir string, filter Archiv return err } - if err := fs.RenameWithFallback(tmpName, localPath); err != nil { + if err := sourcefs.RenameWithFallback(tmpName, localPath); err != nil { return err } @@ -323,7 +481,7 @@ func (s *Storage) AtomicWriteFile(artifact *sourcev1.Artifact, reader io.Reader, return err } - if err := fs.RenameWithFallback(tfName, localPath); err != nil { + if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil { return err } @@ -361,7 +519,7 @@ func (s *Storage) Copy(artifact *sourcev1.Artifact, reader io.Reader) (err error return err } - if err := fs.RenameWithFallback(tfName, localPath); err != nil { + if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil { return err } @@ -421,7 +579,7 @@ func (s *Storage) CopyToPath(artifact *sourcev1.Artifact, subPath, toPath string if err != nil { return err } - if err := fs.RenameWithFallback(fromPath, toPath); err != nil { + if err := sourcefs.RenameWithFallback(fromPath, toPath); err != nil { return err } return nil diff --git a/controllers/storage_test.go b/controllers/storage_test.go index 7da575c64..627317546 100644 --- a/controllers/storage_test.go +++ b/controllers/storage_test.go @@ -19,11 +19,13 @@ package controllers import ( "archive/tar" "compress/gzip" + "context" "fmt" "io" "os" "path" "path/filepath" + "strings" "testing" "time" @@ -48,7 +50,7 @@ func TestStorageConstructor(t *testing.T) { } t.Cleanup(cleanupStoragePath(dir)) - if _, err := NewStorage("/nonexistent", "hostname", time.Minute); err == nil { + if _, err := NewStorage("/nonexistent", "hostname", time.Minute, 2); err == nil { t.Fatal("nonexistent path was allowable in storage constructor") } @@ -58,13 +60,13 @@ func TestStorageConstructor(t *testing.T) { } f.Close() - if _, err := NewStorage(f.Name(), "hostname", time.Minute); err == nil { + if _, err := NewStorage(f.Name(), "hostname", time.Minute, 2); err == nil { os.Remove(f.Name()) t.Fatal("file path was accepted as basedir") } os.Remove(f.Name()) - if _, err := NewStorage(dir, "hostname", time.Minute); err != nil { + if _, err := NewStorage(dir, "hostname", time.Minute, 2); err != nil { t.Fatalf("Valid path did not successfully return: %v", err) } } @@ -117,7 +119,7 @@ func TestStorage_Archive(t *testing.T) { } t.Cleanup(cleanupStoragePath(dir)) - storage, err := NewStorage(dir, "hostname", time.Minute) + storage, err := NewStorage(dir, "hostname", time.Minute, 2) if err != nil { t.Fatalf("error while bootstrapping storage: %v", err) } @@ -289,7 +291,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) { } t.Cleanup(func() { os.RemoveAll(dir) }) - s, err := NewStorage(dir, "hostname", time.Minute) + s, err := NewStorage(dir, "hostname", time.Minute, 2) if err != nil { t.Fatalf("Valid path did not successfully return: %v", err) } @@ -305,7 +307,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) t.Cleanup(func() { os.RemoveAll(dir) }) - s, err := NewStorage(dir, "hostname", time.Minute) + s, err := NewStorage(dir, "hostname", time.Minute, 2) g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") artifact := sourcev1.Artifact{ @@ -368,7 +370,7 @@ func TestStorageRemoveAll(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) t.Cleanup(func() { os.RemoveAll(dir) }) - s, err := NewStorage(dir, "hostname", time.Minute) + s, err := NewStorage(dir, "hostname", time.Minute, 2) g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") artifact := sourcev1.Artifact{ @@ -398,7 +400,7 @@ func TestStorageCopyFromPath(t *testing.T) { } t.Cleanup(cleanupStoragePath(dir)) - storage, err := NewStorage(dir, "hostname", time.Minute) + storage, err := NewStorage(dir, "hostname", time.Minute, 2) if err != nil { t.Fatalf("error while bootstrapping storage: %v", err) } @@ -486,3 +488,218 @@ func TestStorageCopyFromPath(t *testing.T) { }) } } + +func TestStorage_getGarbageFiles(t *testing.T) { + artifactFolder := path.Join("foo", "bar") + tests := []struct { + name string + artifactPaths []string + createPause time.Duration + ttl time.Duration + maxItemsToBeRetained int + totalCountLimit int + wantDeleted []string + }{ + { + name: "delete files based on maxItemsToBeRetained", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + }, + createPause: time.Millisecond * 10, + ttl: time.Minute * 2, + totalCountLimit: 10, + maxItemsToBeRetained: 2, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + }, + }, + { + name: "delete files based on ttl", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + }, + createPause: time.Second * 1, + ttl: time.Second*3 + time.Millisecond*500, + totalCountLimit: 10, + maxItemsToBeRetained: 4, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + }, + { + name: "delete files based on ttl and maxItemsToBeRetained", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + path.Join(artifactFolder, "artifact6.tar.gz"), + }, + createPause: time.Second * 1, + ttl: time.Second*5 + time.Millisecond*500, + totalCountLimit: 10, + maxItemsToBeRetained: 4, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + }, + { + name: "delete files based on ttl and maxItemsToBeRetained and totalCountLimit", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + path.Join(artifactFolder, "artifact6.tar.gz"), + }, + createPause: time.Millisecond * 500, + ttl: time.Millisecond * 500, + totalCountLimit: 3, + maxItemsToBeRetained: 2, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + dir, err := os.MkdirTemp("", "") + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { os.RemoveAll(dir) }) + + s, err := NewStorage(dir, "hostname", tt.ttl, tt.maxItemsToBeRetained) + g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") + + artifact := sourcev1.Artifact{ + Path: tt.artifactPaths[len(tt.artifactPaths)-1], + } + g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred()) + for _, artifactPath := range tt.artifactPaths { + f, err := os.Create(path.Join(dir, artifactPath)) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(f.Close()).ToNot(HaveOccurred()) + time.Sleep(tt.createPause) + } + + deletedPaths, err := s.getGarbageFiles(artifact, tt.totalCountLimit, tt.maxItemsToBeRetained, tt.ttl) + g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files") + g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths))) + for _, wantDeletedPath := range tt.wantDeleted { + present := false + for _, deletedPath := range deletedPaths { + if strings.Contains(deletedPath, wantDeletedPath) { + present = true + break + } + } + if !present { + g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath)) + } + } + }) + } +} + +func TestStorage_GarbageCollect(t *testing.T) { + artifactFolder := path.Join("foo", "bar") + tests := []struct { + name string + artifactPaths []string + wantDeleted []string + wantErr string + ctxTimeout time.Duration + }{ + { + name: "garbage collects", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + }, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + ctxTimeout: time.Second * 1, + }, + { + name: "garbage collection fails with context timeout", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + }, + wantErr: "context deadline exceeded", + ctxTimeout: time.Nanosecond * 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + dir, err := os.MkdirTemp("", "") + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { os.RemoveAll(dir) }) + + s, err := NewStorage(dir, "hostname", time.Second*2, 2) + g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") + + artifact := sourcev1.Artifact{ + Path: tt.artifactPaths[len(tt.artifactPaths)-1], + } + g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred()) + for i, artifactPath := range tt.artifactPaths { + f, err := os.Create(path.Join(dir, artifactPath)) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(f.Close()).ToNot(HaveOccurred()) + if i != len(tt.artifactPaths)-1 { + time.Sleep(time.Second * 1) + } + } + + deletedPaths, err := s.GarbageCollect(context.TODO(), artifact, tt.ctxTimeout) + if tt.wantErr == "" { + g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files") + } else { + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(ContainSubstring(tt.wantErr)) + } + if len(tt.wantDeleted) > 0 { + g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths))) + for _, wantDeletedPath := range tt.wantDeleted { + present := false + for _, deletedPath := range deletedPaths { + if strings.Contains(deletedPath, wantDeletedPath) { + g.Expect(deletedPath).ToNot(BeAnExistingFile()) + present = true + break + } + } + if present == false { + g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath)) + } + } + } + }) + } +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index d61015b91..b495cbc27 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -46,8 +46,10 @@ import ( // Gomega. const ( - timeout = 10 * time.Second - interval = 1 * time.Second + timeout = 10 * time.Second + interval = 1 * time.Second + retentionTTL = 2 * time.Second + retentionRecords = 2 ) var ( @@ -181,7 +183,7 @@ func initTestTLS() { } func newTestStorage(s *testserver.HTTPServer) (*Storage, error) { - storage, err := NewStorage(s.Root(), s.URL(), timeout) + storage, err := NewStorage(s.Root(), s.URL(), retentionTTL, retentionRecords) if err != nil { return nil, err } diff --git a/main.go b/main.go index 98571d099..186577a62 100644 --- a/main.go +++ b/main.go @@ -73,25 +73,27 @@ func init() { func main() { var ( - metricsAddr string - eventsAddr string - healthAddr string - storagePath string - storageAddr string - storageAdvAddr string - concurrent int - requeueDependency time.Duration - watchAllNamespaces bool - helmIndexLimit int64 - helmChartLimit int64 - helmChartFileLimit int64 - clientOptions client.Options - logOptions logger.Options - leaderElectionOptions leaderelection.Options - helmCacheMaxSize int - helmCacheTTL string - helmCachePurgeInterval string - kexAlgos []string + metricsAddr string + eventsAddr string + healthAddr string + storagePath string + storageAddr string + storageAdvAddr string + concurrent int + requeueDependency time.Duration + watchAllNamespaces bool + helmIndexLimit int64 + helmChartLimit int64 + helmChartFileLimit int64 + clientOptions client.Options + logOptions logger.Options + leaderElectionOptions leaderelection.Options + helmCacheMaxSize int + helmCacheTTL string + helmCachePurgeInterval string + kexAlgos []string + artifactRetentionTTL time.Duration + artifactRetentionRecords int ) flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"), @@ -124,6 +126,10 @@ func main() { "The interval at which the cache is purged. Valid time units are ns, us (or µs), ms, s, m, h.") flag.StringSliceVar(&kexAlgos, "ssh-kex-algos", []string{}, "The list of key exchange algorithms to use for ssh connections, arranged from most preferred to the least.") + flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second, + "The duration of time that artifacts will be kept in storage before being garbage collected.") + flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2, + "The maximum number of artifacts to be kept in storage after a garbage collection.") clientOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine) @@ -177,7 +183,7 @@ func main() { if storageAdvAddr == "" { storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog) } - storage := mustInitStorage(storagePath, storageAdvAddr, setupLog) + storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog) setPreferredKexAlgos(kexAlgos) if err = (&controllers.GitRepositoryReconciler{ @@ -283,14 +289,14 @@ func startFileServer(path string, address string, l logr.Logger) { } } -func mustInitStorage(path string, storageAdvAddr string, l logr.Logger) *controllers.Storage { +func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, l logr.Logger) *controllers.Storage { if path == "" { p, _ := os.Getwd() path = filepath.Join(p, "bin") os.MkdirAll(path, 0777) } - storage, err := controllers.NewStorage(path, storageAdvAddr, 5*time.Minute) + storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords) if err != nil { l.Error(err, "unable to initialise storage") os.Exit(1) diff --git a/tests/fuzz/gitrepository_fuzzer.go b/tests/fuzz/gitrepository_fuzzer.go index a81ecdc4c..d96877d40 100644 --- a/tests/fuzz/gitrepository_fuzzer.go +++ b/tests/fuzz/gitrepository_fuzzer.go @@ -174,7 +174,7 @@ func startEnvServer(setupReconcilers func(manager.Manager)) *envtest.Environment panic(err) } defer os.RemoveAll(tmpStoragePath) - storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Second*30) + storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Minute*1, 2) if err != nil { panic(err) }