diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index b01236828..9d4a09889 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -632,14 +632,19 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc return nil } if obj.GetArtifact() != nil { - if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { - return &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %s", err), + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of artifacts failed: %w", err), Reason: "GarbageCollectionFailed", } - } else if len(deleted) > 0 { + r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) + return e + } + if len(delFiles) > 0 { r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - "garbage collected old artifacts") + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index 2f432a4bb..c0b12bf90 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -176,7 +176,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { { name: "garbage collects", beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { - revisions := []string{"a", "b", "c"} + revisions := []string{"a", "b", "c", "d"} for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ @@ -186,26 +186,30 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil { return err } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } } testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, - want: sreconcile.ResultSuccess, assertArtifact: &sourcev1.Artifact{ - Path: "/reconcile-storage/c.txt", - Revision: "c", - Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6", - URL: testStorage.Hostname + "/reconcile-storage/c.txt", - Size: int64p(int64(len("c"))), + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, assertPaths: []string{ + "/reconcile-storage/d.txt", "/reconcile-storage/c.txt", "!/reconcile-storage/b.txt", "!/reconcile-storage/a.txt", }, + want: sreconcile.ResultSuccess, }, { name: "notices missing artifact in storage", @@ -237,7 +241,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil { return err } return nil @@ -259,6 +263,10 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) + defer func() { + g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed()) + }() + r := &BucketReconciler{ EventRecorder: record.NewFakeRecorder(32), Storage: testStorage, diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index 2aa0f8589..e04d35c9f 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -708,13 +708,19 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc return nil } if obj.GetArtifact() != nil { - if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { - return &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of artifacts failed: %w", err), + Reason: "GarbageCollectionFailed", } - } else if len(deleted) > 0 { + r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) + return e + } + if len(delFiles) > 0 { r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - "garbage collected old artifacts") + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/gitrepository_controller_test.go b/controllers/gitrepository_controller_test.go index 88fceb7e7..0ae071272 100644 --- a/controllers/gitrepository_controller_test.go +++ b/controllers/gitrepository_controller_test.go @@ -1104,6 +1104,148 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) { } } +func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) { + tests := []struct { + name string + beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error + want sreconcile.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string + }{ + { + name: "garbage collects", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + revisions := []string{"a", "b", "c", "d"} + for n := range revisions { + v := revisions[n] + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), + Revision: v, + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil { + return err + } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), + }, + assertPaths: []string{ + "/reconcile-storage/d.txt", + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", + }, + want: sreconcile.ResultSuccess, + }, + { + name: "notices missing artifact in storage", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: "/reconcile-storage/invalid.txt", + Revision: "e", + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + want: sreconcile.ResultSuccess, + assertPaths: []string{ + "!/reconcile-storage/invalid.txt", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReconcilingCondition, "NoArtifact", "no artifact for resource in storage"), + }, + }, + { + name: "updates hostname on diff from current", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: "/reconcile-storage/hostname.txt", + Revision: "f", + Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80", + URL: "http://outdated.com/reconcile-storage/hostname.txt", + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil { + return err + } + return nil + }, + want: sreconcile.ResultSuccess, + assertPaths: []string{ + "/reconcile-storage/hostname.txt", + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/hostname.txt", + Revision: "f", + Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80", + URL: testStorage.Hostname + "/reconcile-storage/hostname.txt", + Size: int64p(int64(len("file"))), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + defer func() { + g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed()) + }() + + r := &GitRepositoryReconciler{ + EventRecorder: record.NewFakeRecorder(32), + Storage: testStorage, + } + + obj := &sourcev1.GitRepository{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + }, + } + if tt.beforeFunc != nil { + g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) + } + + var c *git.Commit + var as artifactSet + got, err := r.reconcileStorage(context.TODO(), obj, c, &as, "") + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact)) + if tt.assertArtifact != nil && tt.assertArtifact.URL != "" { + g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL)) + } + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + + for _, p := range tt.assertPaths { + absoluteP := filepath.Join(testStorage.BasePath, p) + if !strings.HasPrefix(p, "!") { + g.Expect(absoluteP).To(BeAnExistingFile()) + continue + } + g.Expect(absoluteP).NotTo(BeAnExistingFile()) + } + }) + } +} + func TestGitRepositoryReconciler_reconcileDelete(t *testing.T) { g := NewWithT(t) diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index 894eb99b6..5396d67dc 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -285,6 +285,9 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC // they match the Storage server hostname of current runtime. func (r *HelmChartReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmChart, build *chart.Build) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage + // Abort if it takes more than 5 seconds. + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() _ = r.garbageCollect(ctx, obj) // Determine if the advertised artifact is still in storage @@ -801,14 +804,19 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1. return nil } if obj.GetArtifact() != nil { - if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { - return &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of artifacts failed: %w", err), Reason: "GarbageCollectionFailed", } - } else if len(deleted) > 0 { + r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) + return e + } + if len(delFiles) > 0 { r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - "garbage collected old artifacts") + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/helmchart_controller_test.go b/controllers/helmchart_controller_test.go index 522908c32..1ecddcd8a 100644 --- a/controllers/helmchart_controller_test.go +++ b/controllers/helmchart_controller_test.go @@ -177,7 +177,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { { name: "garbage collects", beforeFunc: func(obj *sourcev1.HelmChart, storage *Storage) error { - revisions := []string{"a", "b", "c"} + revisions := []string{"a", "b", "c", "d"} for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ @@ -187,21 +187,25 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil { return err } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } } testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, assertArtifact: &sourcev1.Artifact{ - Path: "/reconcile-storage/c.txt", - Revision: "c", - Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6", - URL: testStorage.Hostname + "/reconcile-storage/c.txt", - Size: int64p(int64(len("c"))), + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, assertPaths: []string{ + "/reconcile-storage/d.txt", "/reconcile-storage/c.txt", "!/reconcile-storage/b.txt", "!/reconcile-storage/a.txt", @@ -238,7 +242,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil { return err } return nil @@ -260,6 +264,10 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) + defer func() { + g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed()) + }() + r := &HelmChartReconciler{ EventRecorder: record.NewFakeRecorder(32), Storage: testStorage, @@ -303,7 +311,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) defer os.RemoveAll(tmpDir) - storage, err := NewStorage(tmpDir, "example.com", timeout) + storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords) g.Expect(err).ToNot(HaveOccurred()) gitArtifact := &sourcev1.Artifact{ @@ -777,7 +785,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) defer os.RemoveAll(tmpDir) - storage, err := NewStorage(tmpDir, "example.com", timeout) + storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords) g.Expect(err).ToNot(HaveOccurred()) chartsArtifact := &sourcev1.Artifact{ diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index cbad94102..ab6c2a199 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -241,6 +241,9 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1. // they match the Storage server hostname of current runtime. func (r *HelmRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmRepository, _ *sourcev1.Artifact, _ *repository.ChartRepository) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage + // Abort if it takes more than 5 seconds. + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() _ = r.garbageCollect(ctx, obj) // Determine if the advertised artifact is still in storage @@ -515,14 +518,19 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour return nil } if obj.GetArtifact() != nil { - if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { - return &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of artifacts failed: %w", err), Reason: "GarbageCollectionFailed", } - } else if len(deleted) > 0 { + r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) + return e + } + if len(delFiles) > 0 { r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - "garbage collected old artifacts") + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/helmrepository_controller_test.go b/controllers/helmrepository_controller_test.go index 95b770915..a4508d2f0 100644 --- a/controllers/helmrepository_controller_test.go +++ b/controllers/helmrepository_controller_test.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/darkowlzz/controller-check/status" "github.com/fluxcd/pkg/apis/meta" @@ -146,7 +147,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { { name: "garbage collects", beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error { - revisions := []string{"a", "b", "c"} + revisions := []string{"a", "b", "c", "d"} for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ @@ -156,21 +157,25 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil { return err } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } } testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, assertArtifact: &sourcev1.Artifact{ - Path: "/reconcile-storage/c.txt", - Revision: "c", - Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6", - URL: testStorage.Hostname + "/reconcile-storage/c.txt", - Size: int64p(int64(len("c"))), + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, assertPaths: []string{ + "/reconcile-storage/d.txt", "/reconcile-storage/c.txt", "!/reconcile-storage/b.txt", "!/reconcile-storage/a.txt", @@ -207,7 +212,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { return err } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil { return err } return nil diff --git a/controllers/storage.go b/controllers/storage.go index 55f9a077c..d9358a2b1 100644 --- a/controllers/storage.go +++ b/controllers/storage.go @@ -19,6 +19,7 @@ package controllers import ( "archive/tar" "compress/gzip" + "context" "crypto/sha256" "fmt" "hash" @@ -26,21 +27,28 @@ import ( "net/url" "os" "path/filepath" + "sort" "strings" "time" securejoin "github.com/cyphar/filepath-securejoin" "github.com/go-git/go-git/v5/plumbing/format/gitignore" + kerrors "k8s.io/apimachinery/pkg/util/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/fluxcd/pkg/lockedfile" + "io/fs" + "github.com/fluxcd/pkg/untar" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" - "github.com/fluxcd/source-controller/internal/fs" + sourcefs "github.com/fluxcd/source-controller/internal/fs" "github.com/fluxcd/source-controller/pkg/sourceignore" ) +const GarbageCountLimit = 1000 + // Storage manages artifacts type Storage struct { // BasePath is the local directory path where the source artifacts are stored. @@ -49,19 +57,25 @@ type Storage struct { // Hostname is the file server host name used to compose the artifacts URIs. Hostname string `json:"hostname"` - // Timeout for artifacts operations - Timeout time.Duration `json:"timeout"` + // ArtifactRetentionTTL is the maximum number of artifacts to be kept in storage + // after a garbage collection. + ArtifactRetentionTTL time.Duration `json:"artifactRetentionTTL"` + + // ArtifactRetentionRecords is the duration of time that artifacts will be kept in + // storage before being garbage collected. + ArtifactRetentionRecords int `json:"artifactRetentionRecords"` } // NewStorage creates the storage helper for a given path and hostname. -func NewStorage(basePath string, hostname string, timeout time.Duration) (*Storage, error) { +func NewStorage(basePath string, hostname string, artifactRetentionTTL time.Duration, artifactRetentionRecords int) (*Storage, error) { if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() { return nil, fmt.Errorf("invalid dir path: %s", basePath) } return &Storage{ - BasePath: basePath, - Hostname: hostname, - Timeout: timeout, + BasePath: basePath, + Hostname: hostname, + ArtifactRetentionTTL: artifactRetentionTTL, + ArtifactRetentionRecords: artifactRetentionRecords, }, nil } @@ -145,6 +159,150 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) ([]string, err return deletedFiles, nil } +// getGarbageFiles returns all files that need to be garbage collected for the given artifact. +// Garbage files are determined based on the below flow: +// 1. collect all files with an expired ttl +// 2. if we satisfy maxItemsToBeRetained, then return +// 3. else, remove all files till the latest n files remain, where n=maxItemsToBeRetained +func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) { + localPath := s.LocalPath(artifact) + dir := filepath.Dir(localPath) + garbageFiles := []string{} + filesWithCreatedTs := make(map[time.Time]string) + // sortedPaths contain all files sorted according to their created ts. + sortedPaths := []string{} + now := time.Now().UTC() + totalFiles := 0 + var errors []string + creationTimestamps := []time.Time{} + _ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + errors = append(errors, err.Error()) + return nil + } + if totalFiles >= totalCountLimit { + return fmt.Errorf("Reached file walking limit, already walked over: %d", totalFiles) + } + info, err := d.Info() + if err != nil { + errors = append(errors, err.Error()) + return nil + } + createdAt := info.ModTime().UTC() + diff := now.Sub(createdAt) + // compare the time difference between now and the time at which the file was created + // with the provided ttl. delete if difference is greater than the ttl. + expired := diff > ttl + if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink { + if path != localPath && expired { + garbageFiles = append(garbageFiles, path) + } + totalFiles += 1 + filesWithCreatedTs[createdAt] = path + creationTimestamps = append(creationTimestamps, createdAt) + } + return nil + + }) + if len(errors) > 0 { + return nil, fmt.Errorf("can't walk over file: %s", strings.Join(errors, ",")) + } + + // We already collected enough garbage files to satisfy the no. of max + // items that are supposed to be retained, so exit early. + if totalFiles-len(garbageFiles) < maxItemsToBeRetained { + return garbageFiles, nil + } + + // sort all timestamps in an ascending order. + sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) }) + for _, ts := range creationTimestamps { + path, ok := filesWithCreatedTs[ts] + if !ok { + return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts) + } + sortedPaths = append(sortedPaths, path) + } + + var collected int + noOfGarbageFiles := len(garbageFiles) + for _, path := range sortedPaths { + if path != localPath && !stringInSlice(path, garbageFiles) { + // If we previously collected a few garbage files with an expired ttl, then take that into account + // when checking whether we need to remove more files to satisfy the max no. of items allowed + // in the filesystem, along with the no. of files already removed in this loop. + if noOfGarbageFiles > 0 { + if (len(sortedPaths) - collected - len(garbageFiles)) > maxItemsToBeRetained { + garbageFiles = append(garbageFiles, path) + collected += 1 + } + } else { + if len(sortedPaths)-collected > maxItemsToBeRetained { + garbageFiles = append(garbageFiles, path) + collected += 1 + } + } + } + } + + return garbageFiles, nil +} + +// GarbageCollect removes all garabge files in the artifact dir according to the provided +// retention options. +func (s *Storage) GarbageCollect(ctx context.Context, artifact sourcev1.Artifact, timeout time.Duration) ([]string, error) { + delFilesChan := make(chan []string) + errChan := make(chan error) + // Abort if it takes more than the provided timeout duration. + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + go func() { + garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, s.ArtifactRetentionRecords, s.ArtifactRetentionTTL) + if err != nil { + errChan <- err + return + } + var errors []error + var deleted []string + if len(garbageFiles) > 0 { + for _, file := range garbageFiles { + err := os.Remove(file) + if err != nil { + errors = append(errors, err) + } else { + deleted = append(deleted, file) + } + } + } + if len(errors) > 0 { + errChan <- kerrors.NewAggregate(errors) + return + } + delFilesChan <- deleted + }() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case delFiles := <-delFilesChan: + return delFiles, nil + case err := <-errChan: + return nil, err + } + } +} + +func stringInSlice(a string, list []string) bool { + for _, b := range list { + if b == a { + return true + } + } + return false +} + // ArtifactExist returns a boolean indicating whether the v1beta1.Artifact exists in storage and is a regular file. func (s *Storage) ArtifactExist(artifact sourcev1.Artifact) bool { fi, err := os.Lstat(s.LocalPath(artifact)) @@ -281,7 +439,7 @@ func (s *Storage) Archive(artifact *sourcev1.Artifact, dir string, filter Archiv return err } - if err := fs.RenameWithFallback(tmpName, localPath); err != nil { + if err := sourcefs.RenameWithFallback(tmpName, localPath); err != nil { return err } @@ -323,7 +481,7 @@ func (s *Storage) AtomicWriteFile(artifact *sourcev1.Artifact, reader io.Reader, return err } - if err := fs.RenameWithFallback(tfName, localPath); err != nil { + if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil { return err } @@ -361,7 +519,7 @@ func (s *Storage) Copy(artifact *sourcev1.Artifact, reader io.Reader) (err error return err } - if err := fs.RenameWithFallback(tfName, localPath); err != nil { + if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil { return err } @@ -421,7 +579,7 @@ func (s *Storage) CopyToPath(artifact *sourcev1.Artifact, subPath, toPath string if err != nil { return err } - if err := fs.RenameWithFallback(fromPath, toPath); err != nil { + if err := sourcefs.RenameWithFallback(fromPath, toPath); err != nil { return err } return nil diff --git a/controllers/storage_test.go b/controllers/storage_test.go index 7da575c64..627317546 100644 --- a/controllers/storage_test.go +++ b/controllers/storage_test.go @@ -19,11 +19,13 @@ package controllers import ( "archive/tar" "compress/gzip" + "context" "fmt" "io" "os" "path" "path/filepath" + "strings" "testing" "time" @@ -48,7 +50,7 @@ func TestStorageConstructor(t *testing.T) { } t.Cleanup(cleanupStoragePath(dir)) - if _, err := NewStorage("/nonexistent", "hostname", time.Minute); err == nil { + if _, err := NewStorage("/nonexistent", "hostname", time.Minute, 2); err == nil { t.Fatal("nonexistent path was allowable in storage constructor") } @@ -58,13 +60,13 @@ func TestStorageConstructor(t *testing.T) { } f.Close() - if _, err := NewStorage(f.Name(), "hostname", time.Minute); err == nil { + if _, err := NewStorage(f.Name(), "hostname", time.Minute, 2); err == nil { os.Remove(f.Name()) t.Fatal("file path was accepted as basedir") } os.Remove(f.Name()) - if _, err := NewStorage(dir, "hostname", time.Minute); err != nil { + if _, err := NewStorage(dir, "hostname", time.Minute, 2); err != nil { t.Fatalf("Valid path did not successfully return: %v", err) } } @@ -117,7 +119,7 @@ func TestStorage_Archive(t *testing.T) { } t.Cleanup(cleanupStoragePath(dir)) - storage, err := NewStorage(dir, "hostname", time.Minute) + storage, err := NewStorage(dir, "hostname", time.Minute, 2) if err != nil { t.Fatalf("error while bootstrapping storage: %v", err) } @@ -289,7 +291,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) { } t.Cleanup(func() { os.RemoveAll(dir) }) - s, err := NewStorage(dir, "hostname", time.Minute) + s, err := NewStorage(dir, "hostname", time.Minute, 2) if err != nil { t.Fatalf("Valid path did not successfully return: %v", err) } @@ -305,7 +307,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) t.Cleanup(func() { os.RemoveAll(dir) }) - s, err := NewStorage(dir, "hostname", time.Minute) + s, err := NewStorage(dir, "hostname", time.Minute, 2) g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") artifact := sourcev1.Artifact{ @@ -368,7 +370,7 @@ func TestStorageRemoveAll(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) t.Cleanup(func() { os.RemoveAll(dir) }) - s, err := NewStorage(dir, "hostname", time.Minute) + s, err := NewStorage(dir, "hostname", time.Minute, 2) g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") artifact := sourcev1.Artifact{ @@ -398,7 +400,7 @@ func TestStorageCopyFromPath(t *testing.T) { } t.Cleanup(cleanupStoragePath(dir)) - storage, err := NewStorage(dir, "hostname", time.Minute) + storage, err := NewStorage(dir, "hostname", time.Minute, 2) if err != nil { t.Fatalf("error while bootstrapping storage: %v", err) } @@ -486,3 +488,218 @@ func TestStorageCopyFromPath(t *testing.T) { }) } } + +func TestStorage_getGarbageFiles(t *testing.T) { + artifactFolder := path.Join("foo", "bar") + tests := []struct { + name string + artifactPaths []string + createPause time.Duration + ttl time.Duration + maxItemsToBeRetained int + totalCountLimit int + wantDeleted []string + }{ + { + name: "delete files based on maxItemsToBeRetained", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + }, + createPause: time.Millisecond * 10, + ttl: time.Minute * 2, + totalCountLimit: 10, + maxItemsToBeRetained: 2, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + }, + }, + { + name: "delete files based on ttl", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + }, + createPause: time.Second * 1, + ttl: time.Second*3 + time.Millisecond*500, + totalCountLimit: 10, + maxItemsToBeRetained: 4, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + }, + { + name: "delete files based on ttl and maxItemsToBeRetained", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + path.Join(artifactFolder, "artifact6.tar.gz"), + }, + createPause: time.Second * 1, + ttl: time.Second*5 + time.Millisecond*500, + totalCountLimit: 10, + maxItemsToBeRetained: 4, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + }, + { + name: "delete files based on ttl and maxItemsToBeRetained and totalCountLimit", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + path.Join(artifactFolder, "artifact6.tar.gz"), + }, + createPause: time.Millisecond * 500, + ttl: time.Millisecond * 500, + totalCountLimit: 3, + maxItemsToBeRetained: 2, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + dir, err := os.MkdirTemp("", "") + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { os.RemoveAll(dir) }) + + s, err := NewStorage(dir, "hostname", tt.ttl, tt.maxItemsToBeRetained) + g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") + + artifact := sourcev1.Artifact{ + Path: tt.artifactPaths[len(tt.artifactPaths)-1], + } + g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred()) + for _, artifactPath := range tt.artifactPaths { + f, err := os.Create(path.Join(dir, artifactPath)) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(f.Close()).ToNot(HaveOccurred()) + time.Sleep(tt.createPause) + } + + deletedPaths, err := s.getGarbageFiles(artifact, tt.totalCountLimit, tt.maxItemsToBeRetained, tt.ttl) + g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files") + g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths))) + for _, wantDeletedPath := range tt.wantDeleted { + present := false + for _, deletedPath := range deletedPaths { + if strings.Contains(deletedPath, wantDeletedPath) { + present = true + break + } + } + if !present { + g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath)) + } + } + }) + } +} + +func TestStorage_GarbageCollect(t *testing.T) { + artifactFolder := path.Join("foo", "bar") + tests := []struct { + name string + artifactPaths []string + wantDeleted []string + wantErr string + ctxTimeout time.Duration + }{ + { + name: "garbage collects", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + }, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + ctxTimeout: time.Second * 1, + }, + { + name: "garbage collection fails with context timeout", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + }, + wantErr: "context deadline exceeded", + ctxTimeout: time.Nanosecond * 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + dir, err := os.MkdirTemp("", "") + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { os.RemoveAll(dir) }) + + s, err := NewStorage(dir, "hostname", time.Second*2, 2) + g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") + + artifact := sourcev1.Artifact{ + Path: tt.artifactPaths[len(tt.artifactPaths)-1], + } + g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred()) + for i, artifactPath := range tt.artifactPaths { + f, err := os.Create(path.Join(dir, artifactPath)) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(f.Close()).ToNot(HaveOccurred()) + if i != len(tt.artifactPaths)-1 { + time.Sleep(time.Second * 1) + } + } + + deletedPaths, err := s.GarbageCollect(context.TODO(), artifact, tt.ctxTimeout) + if tt.wantErr == "" { + g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files") + } else { + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(ContainSubstring(tt.wantErr)) + } + if len(tt.wantDeleted) > 0 { + g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths))) + for _, wantDeletedPath := range tt.wantDeleted { + present := false + for _, deletedPath := range deletedPaths { + if strings.Contains(deletedPath, wantDeletedPath) { + g.Expect(deletedPath).ToNot(BeAnExistingFile()) + present = true + break + } + } + if present == false { + g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath)) + } + } + } + }) + } +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index d61015b91..b495cbc27 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -46,8 +46,10 @@ import ( // Gomega. const ( - timeout = 10 * time.Second - interval = 1 * time.Second + timeout = 10 * time.Second + interval = 1 * time.Second + retentionTTL = 2 * time.Second + retentionRecords = 2 ) var ( @@ -181,7 +183,7 @@ func initTestTLS() { } func newTestStorage(s *testserver.HTTPServer) (*Storage, error) { - storage, err := NewStorage(s.Root(), s.URL(), timeout) + storage, err := NewStorage(s.Root(), s.URL(), retentionTTL, retentionRecords) if err != nil { return nil, err } diff --git a/main.go b/main.go index 98571d099..186577a62 100644 --- a/main.go +++ b/main.go @@ -73,25 +73,27 @@ func init() { func main() { var ( - metricsAddr string - eventsAddr string - healthAddr string - storagePath string - storageAddr string - storageAdvAddr string - concurrent int - requeueDependency time.Duration - watchAllNamespaces bool - helmIndexLimit int64 - helmChartLimit int64 - helmChartFileLimit int64 - clientOptions client.Options - logOptions logger.Options - leaderElectionOptions leaderelection.Options - helmCacheMaxSize int - helmCacheTTL string - helmCachePurgeInterval string - kexAlgos []string + metricsAddr string + eventsAddr string + healthAddr string + storagePath string + storageAddr string + storageAdvAddr string + concurrent int + requeueDependency time.Duration + watchAllNamespaces bool + helmIndexLimit int64 + helmChartLimit int64 + helmChartFileLimit int64 + clientOptions client.Options + logOptions logger.Options + leaderElectionOptions leaderelection.Options + helmCacheMaxSize int + helmCacheTTL string + helmCachePurgeInterval string + kexAlgos []string + artifactRetentionTTL time.Duration + artifactRetentionRecords int ) flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"), @@ -124,6 +126,10 @@ func main() { "The interval at which the cache is purged. Valid time units are ns, us (or µs), ms, s, m, h.") flag.StringSliceVar(&kexAlgos, "ssh-kex-algos", []string{}, "The list of key exchange algorithms to use for ssh connections, arranged from most preferred to the least.") + flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second, + "The duration of time that artifacts will be kept in storage before being garbage collected.") + flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2, + "The maximum number of artifacts to be kept in storage after a garbage collection.") clientOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine) @@ -177,7 +183,7 @@ func main() { if storageAdvAddr == "" { storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog) } - storage := mustInitStorage(storagePath, storageAdvAddr, setupLog) + storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog) setPreferredKexAlgos(kexAlgos) if err = (&controllers.GitRepositoryReconciler{ @@ -283,14 +289,14 @@ func startFileServer(path string, address string, l logr.Logger) { } } -func mustInitStorage(path string, storageAdvAddr string, l logr.Logger) *controllers.Storage { +func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, l logr.Logger) *controllers.Storage { if path == "" { p, _ := os.Getwd() path = filepath.Join(p, "bin") os.MkdirAll(path, 0777) } - storage, err := controllers.NewStorage(path, storageAdvAddr, 5*time.Minute) + storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords) if err != nil { l.Error(err, "unable to initialise storage") os.Exit(1) diff --git a/tests/fuzz/gitrepository_fuzzer.go b/tests/fuzz/gitrepository_fuzzer.go index a81ecdc4c..d96877d40 100644 --- a/tests/fuzz/gitrepository_fuzzer.go +++ b/tests/fuzz/gitrepository_fuzzer.go @@ -174,7 +174,7 @@ func startEnvServer(setupReconcilers func(manager.Manager)) *envtest.Environment panic(err) } defer os.RemoveAll(tmpStoragePath) - storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Second*30) + storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Minute*1, 2) if err != nil { panic(err) }