From 5f69d01c8ae910f389b04ae0d086ae7e4040d335 Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 12 Jun 2024 09:52:07 -0500 Subject: [PATCH 1/3] Optionally delete stale distributed state --- README.md | 4 +++- caddyfile.go | 14 ++++++++++++++ distributed.go | 14 ++++++++++++++ distributed_test.go | 3 ++- 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index edc4a4c..2920a8c 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,8 @@ This is an HTTP handler module, so it can be used wherever `http.handlers` modul } "distributed": { "write_interval": "", - "read_interval": "" + "read_interval": "", + "purge_age": "" }, } ``` @@ -130,6 +131,7 @@ rate_limit { distributed { read_interval write_interval + purge_age } storage jitter diff --git a/caddyfile.go b/caddyfile.go index 1fa2c39..acba24c 100644 --- a/caddyfile.go +++ b/caddyfile.go @@ -46,6 +46,7 @@ func parseCaddyfile(helper httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, e // distributed { // read_interval // write_interval +// purge_age // } // storage // jitter @@ -150,6 +151,19 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return d.Errf("invalid write interval '%s': %v", d.Val(), err) } h.Distributed.WriteInterval = caddy.Duration(interval) + + case "purge_age": + if !d.NextArg() { + return d.ArgErr() + } + if h.Distributed.PurgeAge != 0 { + return d.Errf("purge age already specified: %v", h.Distributed.PurgeAge) + } + age, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("invalid purge age '%s': %v", d.Val(), err) + } + h.Distributed.PurgeAge = caddy.Duration(age) } } diff --git a/distributed.go b/distributed.go index 6258880..16a6aa7 100644 --- a/distributed.go +++ b/distributed.go @@ -46,6 +46,10 @@ type DistributedRateLimiting struct { // Default: 5s ReadInterval caddy.Duration `json:"read_interval,omitempty"` + // How long to wait before deleting stale states from other instances. + // Default: never + PurgeAge caddy.Duration `json:"purge_age,omitempty"` + instanceID string otherStates []rlState @@ -153,6 +157,16 @@ func (h Handler) syncDistributedRead(ctx context.Context) error { continue } + if h.Distributed.PurgeAge != 0 && state.Timestamp.Before(now().Add(-time.Duration(h.Distributed.PurgeAge))) { + err = h.storage.Delete(ctx, instanceFile) + if err != nil { + h.logger.Error("cannot delete rate limiter state file", + zap.String("key", instanceFile), + zap.Error(err)) + } + continue + } + otherStates = append(otherStates, state) } diff --git a/distributed_test.go b/distributed_test.go index 9e39697..f464049 100644 --- a/distributed_test.go +++ b/distributed_test.go @@ -158,7 +158,8 @@ func TestDistributed(t *testing.T) { }, "distributed": { "write_interval": "3600s", - "read_interval": "3600s" + "read_interval": "3600s", + "purge_age": "7200s" } }, { From fdd7f016c9c2061fbe69f7314101f0a031fc64c7 Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 12 Jun 2024 14:00:32 -0500 Subject: [PATCH 2/3] Add unit test to confirm files are deleted --- distributed_test.go | 83 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 77 insertions(+), 6 deletions(-) diff --git a/distributed_test.go b/distributed_test.go index f464049..03ae172 100644 --- a/distributed_test.go +++ b/distributed_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "os" + "path" "strings" "testing" "time" @@ -26,18 +27,23 @@ import ( "github.com/caddyserver/caddy/v2/caddytest" "github.com/caddyserver/certmagic" "github.com/google/uuid" + "go.uber.org/zap" ) -func TestDistributed(t *testing.T) { - initTime() - window := 60 - maxEvents := 10 - +func ensureAppDataDir(t *testing.T) { // Make sure AppDataDir exists, because otherwise the caddytest.Tester won't // be able to generate an instance ID if err := os.MkdirAll(caddy.AppDataDir(), 0700); err != nil { t.Fatalf("failed to create app data dir %s: %s", caddy.AppDataDir(), err) } +} + +func TestDistributed(t *testing.T) { + initTime() + window := 60 + maxEvents := 10 + + ensureAppDataDir(t) testCases := []struct { name string @@ -85,7 +91,7 @@ func TestDistributed(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { storageDir := t.TempDir() // Use a random UUID as the zone so that rate limits from multiple test runs - // collide with each other + // don't collide with each other zone := uuid.New().String() // To simulate a peer in a rate limiting cluster, constuct a @@ -190,3 +196,68 @@ func TestDistributed(t *testing.T) { }) } } + +func TestPurgeDistributedState(t *testing.T) { + initTime() + ensureAppDataDir(t) + logger, err := zap.NewDevelopment() + if err != nil { + t.Fatalf("failed to create logger: %s", err) + } + + storageDir := t.TempDir() + storage := certmagic.FileStorage{ + Path: storageDir, + } + + // Seed the storage directory with a rate limit state file from another instance. + otherRlState := rlState{ + Timestamp: now(), + Zones: make(map[string]map[string]rlStateValue, 0), + } + if err := writeRateLimitState(context.Background(), otherRlState, "12345678-1234-1234-1234-123456789abc", &storage); err != nil { + t.Fatalf("failed to write state to storage: %s", err) + } + + handler := Handler{ + Distributed: &DistributedRateLimiting{ + instanceID: "99999999-9999-9999-9999-999999999999", + PurgeAge: caddy.Duration(time.Hour), + }, + storage: &storage, + logger: logger, + } + + // Perform initial read, and confirm it picks up the existing state file. + err = handler.syncDistributedRead(context.Background()) + if err != nil { + t.Fatalf("reading distributed state failed: %s", err) + } + if len(handler.Distributed.otherStates) != 1 { + t.Fatalf("did not read other states correctly: %v", handler.Distributed.otherStates) + } + dirEntries, err := os.ReadDir(path.Join(storageDir, "rate_limit", "instances")) + if err != nil { + t.Fatalf("couldn't list directory: %s", err) + } + if len(dirEntries) != 1 { + t.Fatalf("wrong number of files present in storage directory: %v", dirEntries) + } + + // Advance time and sync again. The old state file should be deleted now. + advanceTime(2 * 60 * 60) + err = handler.syncDistributedRead(context.Background()) + if err != nil { + t.Fatalf("reading distributed state failed: %s", err) + } + if len(handler.Distributed.otherStates) != 0 { + t.Fatalf("expected other state to be deleted: %v", handler.Distributed.otherStates) + } + dirEntries, err = os.ReadDir(path.Join(storageDir, "rate_limit", "instances")) + if err != nil { + t.Fatalf("couldn't list directory: %s", err) + } + if len(dirEntries) != 0 { + t.Fatalf("storage directory was not empty: %v", dirEntries) + } +} From 7d28c01ef7ea23847e09115f704c9983a3ad5a4e Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 12 Jun 2024 14:03:00 -0500 Subject: [PATCH 3/3] Review feedback --- distributed.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed.go b/distributed.go index 16a6aa7..d44eeb0 100644 --- a/distributed.go +++ b/distributed.go @@ -160,7 +160,7 @@ func (h Handler) syncDistributedRead(ctx context.Context) error { if h.Distributed.PurgeAge != 0 && state.Timestamp.Before(now().Add(-time.Duration(h.Distributed.PurgeAge))) { err = h.storage.Delete(ctx, instanceFile) if err != nil { - h.logger.Error("cannot delete rate limiter state file", + h.logger.Error("cannot delete stale rate limiter state file", zap.String("key", instanceFile), zap.Error(err)) }