From f8941193c6fc34c84628deca269f2dc56639c094 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Sat, 27 Jul 2024 20:52:21 +0100 Subject: [PATCH] feat: shutdown race resilience A significant rewrite to ensure that we don't suffer from shutdown race conditions as the prune condition is met and additional resources are being created. Previously this would remove resources that were still in use, now we retry if we detect new resources have been created within a window of the prune condition triggering. This supports the following new environment configuration settings: - RYUK_REMOVE_RETRIES - The number of times to retry removing a resource. - RYUK_REQUEST_TIMEOUT - The timeout for any Docker requests. - RYUK_RETRY_OFFSET - The offset added to the start time of the prune pass that is used as the minimum resource creation time. - RYUK_SHUTDOWN_TIMEOUT - The duration after shutdown has been requested when the remaining connections are ignored and prune checks start. Update README to correct example, as health is only valid for containers not the other resources, so would cause failures.# Bump version of golangci-lint so it supports range over int. --- .github/workflows/golangci-lint.yml | 2 +- .gitignore | 6 + README.md | 44 +- config.go | 66 +++ config_test.go | 80 ++++ consts.go | 18 + go.mod | 56 +-- go.sum | 159 +++---- interfaces.go | 25 ++ linux/Dockerfile | 2 +- main.go | 357 +-------------- main_test.go | 400 ----------------- mock_test.go | 67 +++ reaper.go | 661 ++++++++++++++++++++++++++++ reaper_test.go | 521 ++++++++++++++++++++++ windows/Dockerfile | 2 +- 16 files changed, 1562 insertions(+), 904 deletions(-) create mode 100644 config.go create mode 100644 config_test.go create mode 100644 consts.go create mode 100644 interfaces.go delete mode 100644 main_test.go create mode 100644 mock_test.go create mode 100644 reaper.go create mode 100644 reaper_test.go diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 93aed7b..04d3563 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -22,5 +22,5 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@3cfe3a4abbb849e10058ce4af15d205b6da42804 # v4 with: - version: v1.55.2 + version: v1.60.3 args: --timeout=3m diff --git a/.gitignore b/.gitignore index dce3433..2f6d5c9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,9 @@ vendor/ bin/ + +# Binary +moby-ryuk + +# VS Code +.vscode diff --git a/README.md b/README.md index 6ed1cf3..4781929 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,36 @@ # Moby Ryuk -This project helps you to remove containers/networks/volumes/images by given filter after specified delay. +This project helps you to remove containers, networks, volumes and images by given filter after specified delay. -# Usage +## Building + +To build the binary only run: + +```shell +go build +``` + +To build the Linux docker container as the latest tag: + +```shell +docker build -f linux/Dockerfile -t testcontainers/ryuk:latest . +``` + +## Usage 1. Start it: - $ RYUK_PORT=8080 ./bin/moby-ryuk - $ # You can also run it with Docker - $ docker run -v /var/run/docker.sock:/var/run/docker.sock -e RYUK_PORT=8080 -p 8080:8080 testcontainers/ryuk:0.6.0 + RYUK_PORT=8080 ./bin/moby-ryuk + # You can also run it with Docker + docker run -v /var/run/docker.sock:/var/run/docker.sock -e RYUK_PORT=8080 -p 8080:8080 testcontainers/ryuk:0.6.0 1. Connect via TCP: - $ nc localhost 8080 + nc localhost 8080 1. Send some filters: - label=testing=true&health=unhealthy + label=testing=true&label=testing.sessionid=mysession ACK label=something ACK @@ -37,7 +51,15 @@ This project helps you to remove containers/networks/volumes/images by given fil ## Ryuk configuration -- `RYUK_CONNECTION_TIMEOUT` - Environment variable that defines the timeout for Ryuk to receive the first connection (default: 60s). Value layout is described in [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) documentation. -- `RYUK_PORT` - Environment variable that defines the port where Ryuk will be bound to (default: 8080). -- `RYUK_RECONNECTION_TIMEOUT` - Environment variable that defines the timeout for Ryuk to reconnect to Docker (default: 10s). Value layout is described in [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) documentation. -- `RYUK_VERBOSE` - Environment variable that defines if Ryuk should print debug logs (default: false). +The following environment variables can be configured to change the behaviour: + +| Environment Variable | Default | Format | Description | +| - | - | - | - | +| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown | +| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections | +| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown | +| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests | +| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource | +| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed | +| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging | +| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start | diff --git a/config.go b/config.go new file mode 100644 index 0000000..d594747 --- /dev/null +++ b/config.go @@ -0,0 +1,66 @@ +package main + +import ( + "fmt" + "log/slog" + "time" + + "github.com/caarlos0/env/v11" +) + +// config represents the configuration for the reaper. +type config struct { + // ConnectionTimeout is the duration without receiving any connections which will trigger a shutdown. + ConnectionTimeout time.Duration `env:"RYUK_CONNECTION_TIMEOUT" envDefault:"60s"` + + // ReconnectionTimeout is the duration after the last connection closes which will trigger + // resource clean up and shutdown. + ReconnectionTimeout time.Duration `env:"RYUK_RECONNECTION_TIMEOUT" envDefault:"10s"` + + // RequestTimeout is the timeout for any Docker requests. + RequestTimeout time.Duration `env:"RYUK_REQUEST_TIMEOUT" envDefault:"10s"` + + // RemoveRetries is the number of times to retry removing a resource. + RemoveRetries int `env:"RYUK_REMOVE_RETRIES" envDefault:"10"` + + // RetryOffset is the offset added to the start time of the prune pass that is + // used as the minimum resource creation time. Any resource created after this + // calculated time will trigger a retry to ensure in use resources are not removed. + RetryOffset time.Duration `env:"RYUK_RETRY_OFFSET" envDefault:"-1s"` + + // ShutdownTimeout is the maximum amount of time the reaper will wait + // for once signalled to shutdown before it terminates even if connections + // are still established. + ShutdownTimeout time.Duration `env:"RYUK_SHUTDOWN_TIMEOUT" envDefault:"10m"` + + // Port is the port to listen on for connections. + Port uint16 `env:"RYUK_PORT" envDefault:"8080"` + + // Verbose is whether to enable verbose aka debug logging. + Verbose bool `env:"RYUK_VERBOSE" envDefault:"false"` +} + +// LogAttrs returns the configuration as a slice of attributes. +func (c config) LogAttrs() []slog.Attr { + return []slog.Attr{ + slog.Duration("connection_timeout", c.ConnectionTimeout), + slog.Duration("reconnection_timeout", c.ReconnectionTimeout), + slog.Duration("request_timeout", c.RequestTimeout), + slog.Duration("shutdown_timeout", c.ShutdownTimeout), + slog.Int("remove_retries", c.RemoveRetries), + slog.Duration("retry_offset", c.RetryOffset), + slog.Int("port", int(c.Port)), + slog.Bool("verbose", c.Verbose), + } +} + +// loadConfig loads the configuration from the environment +// applying defaults where necessary. +func loadConfig() (*config, error) { + var cfg config + if err := env.Parse(&cfg); err != nil { + return nil, fmt.Errorf("parse env: %w", err) + } + + return &cfg, nil +} diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..198fd65 --- /dev/null +++ b/config_test.go @@ -0,0 +1,80 @@ +package main + +import ( + "os" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// clearConfigEnv clears the environment variables for the config fields. +func clearConfigEnv(t *testing.T) { + t.Helper() + + var cfg config + typ := reflect.TypeOf(cfg) + for i := range typ.NumField() { + field := typ.Field(i) + if name := field.Tag.Get("env"); name != "" { + if os.Getenv(name) != "" { + t.Setenv(name, "") + } + } + } +} + +func Test_loadConfig(t *testing.T) { + tests := map[string]struct { + setEnv func(*testing.T) + expected config + }{ + "defaults": { + setEnv: clearConfigEnv, + expected: config{ + ConnectionTimeout: time.Minute, + Port: 8080, + ReconnectionTimeout: time.Second * 10, + RemoveRetries: 10, + RequestTimeout: time.Second * 10, + RetryOffset: -time.Second, + ShutdownTimeout: time.Minute * 10, + }, + }, + "custom": { + setEnv: func(t *testing.T) { + t.Helper() + + clearConfigEnv(t) + t.Setenv("RYUK_PORT", "1234") + t.Setenv("RYUK_CONNECTION_TIMEOUT", "2s") + t.Setenv("RYUK_RECONNECTION_TIMEOUT", "3s") + t.Setenv("RYUK_REQUEST_TIMEOUT", "4s") + t.Setenv("RYUK_REMOVE_RETRIES", "5") + t.Setenv("RYUK_RETRY_OFFSET", "-6s") + t.Setenv("RYUK_SHUTDOWN_TIMEOUT", "7s") + }, + expected: config{ + Port: 1234, + ConnectionTimeout: time.Second * 2, + ReconnectionTimeout: time.Second * 3, + RequestTimeout: time.Second * 4, + RemoveRetries: 5, + RetryOffset: -time.Second * 6, + ShutdownTimeout: time.Second * 7, + }, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + if tc.setEnv != nil { + tc.setEnv(t) + } + + cfg, err := loadConfig() + require.NoError(t, err) + require.Equal(t, tc.expected, *cfg) + }) + } +} diff --git a/consts.go b/consts.go new file mode 100644 index 0000000..625d8c8 --- /dev/null +++ b/consts.go @@ -0,0 +1,18 @@ +package main + +const ( + // labelBase is the base label for testcontainers. + labelBase = "org.testcontainers" + + // ryukLabel is the label used to identify reaper containers. + ryukLabel = labelBase + ".ryuk" + + // fieldError is the log field key for errors. + fieldError = "error" + + // fieldAddress is the log field a client or listening address. + fieldAddress = "address" + + // fieldClients is the log field used for client counts. + fieldClients = "clients" +) diff --git a/go.mod b/go.mod index 83905d0..cc27cec 100644 --- a/go.mod +++ b/go.mod @@ -1,64 +1,46 @@ module github.com/testcontainers/moby-ryuk -go 1.21 +go 1.22 require ( + github.com/caarlos0/env/v11 v11.1.0 github.com/docker/docker v27.1.1+incompatible github.com/stretchr/testify v1.9.0 - github.com/testcontainers/testcontainers-go v0.33.0 - gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e ) require ( - dario.cat/mergo v1.0.0 // indirect - github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 // indirect - github.com/containerd/containerd v1.7.18 // indirect github.com/containerd/log v0.1.0 // indirect - github.com/containerd/platforms v0.2.1 // indirect - github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-ole/go-ole v1.2.6 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/klauspost/compress v1.17.4 // indirect - github.com/kr/text v0.2.0 // indirect - github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect - github.com/magiconair/properties v1.8.7 // indirect - github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect - github.com/moby/patternmatcher v0.6.0 // indirect - github.com/moby/sys/sequential v0.5.0 // indirect - github.com/moby/sys/user v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/shirou/gopsutil/v3 v3.23.12 // indirect - github.com/shoenig/go-m1cpu v0.1.6 // indirect - github.com/sirupsen/logrus v1.9.3 // indirect - github.com/tklauser/go-sysconf v0.3.12 // indirect - github.com/tklauser/numcpus v0.6.1 // indirect - github.com/yusufpapurcu/wmi v1.2.3 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect - go.opentelemetry.io/otel/metric v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect - golang.org/x/crypto v0.24.0 // indirect - golang.org/x/net v0.26.0 // indirect - golang.org/x/sys v0.21.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/sdk v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/time v0.5.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f // indirect + google.golang.org/grpc v1.65.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gotest.tools/v3 v3.5.1 // indirect ) diff --git a/go.sum b/go.sum index 4e39ec5..ebc6abe 100644 --- a/go.sum +++ b/go.sum @@ -1,27 +1,14 @@ -dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= -dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= -github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= -github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764= -github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U= -github.com/containerd/containerd v1.7.18 h1:jqjZTQNfXGoEaZdW1WwPU0RqSn1Bm2Ay/KJPUuO8nao= -github.com/containerd/containerd v1.7.18/go.mod h1:IYEk9/IO6wAPUz2bCMVUbsfXjzw5UNP5fLz4PsUygQ4= +github.com/caarlos0/env/v11 v11.1.0 h1:a5qZqieE9ZfzdvbbdhTalRrHT5vu/4V1/ad1Ka6frhI= +github.com/caarlos0/env/v11 v11.1.0/go.mod h1:LwgkYk1kDvfGpHthrWWLof3Ny7PezzFwS4QrsJdHTMo= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= -github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= -github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= -github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= -github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= -github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= @@ -35,44 +22,29 @@ github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= -github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= -github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= -github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= -github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 h1:JAEbJn3j/FrhdWA9jW8B5ajsLIjeuEHLi8xE4fk997o= -github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2/go.mod h1:0KeJpeMD6o+O4hW7qJOT7vyQPKrWmj26uf5wMc/IiIs= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= -github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= -github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= -github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= -github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= -github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg= -github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= @@ -81,93 +53,63 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= -github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= -github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= -github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= -github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11DgpE4= -github.com/shirou/gopsutil/v3 v3.23.12/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM= -github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= -github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= -github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= -github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/testcontainers/testcontainers-go v0.33.0 h1:zJS9PfXYT5O0ZFXM2xxXfk4J5UMw/kRiISng037Gxdw= -github.com/testcontainers/testcontainers-go v0.33.0/go.mod h1:W80YpTa8D5C3Yy16icheD01UTDu+LmXIA2Keo+jWtT8= -github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= -github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= -github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= -github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= -github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= -go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= -go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= -go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 h1:j9+03ymgYhPKmeXGk5Zu+cIZOlVzd9Zv7QIiyItjFBU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0/go.mod h1:Y5+XiUG4Emn1hTfciPzGPJaSI+RpDts6BnCIir0SLqk= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -176,20 +118,17 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 h1:RFiFrvy37/mpSpdySBDrUdipW/dHwsRwh3J3+A9VgT4= -google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.mod h1:Z5Iiy3jtmioajWHDGFk7CeugTyHtPvMHA4UTmUkyalE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= -google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= -google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 h1:0+ozOGcrp+Y8Aq8TLNN2Aliibms5LEzsq99ZZmAGYm0= +google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094/go.mod h1:fJ/e3If/Q67Mj99hin0hMhiNyCRmt6BQ2aWIJshUSJw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f h1:RARaIm8pxYuxyNPbBQf5igT7XdOyCNtat1qAT2ZxjU4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e h1:bJHzu9Qwc9wQRWJ/WVkJGAfs+riucl/tKAFNxf9pzqk= -gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e/go.mod h1:tve0rTLdGlwnXF7iBO9rbAEyeXvuuPx0n4DvXS/Nw7o= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= diff --git a/interfaces.go b/interfaces.go new file mode 100644 index 0000000..44048d6 --- /dev/null +++ b/interfaces.go @@ -0,0 +1,25 @@ +package main + +import ( + "context" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" +) + +// dockerClient is an interface that represents the reapers required Docker methods. +type dockerClient interface { + ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) + ContainerRemove(ctx context.Context, containerID string, options container.RemoveOptions) error + ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error) + ImageRemove(ctx context.Context, imageID string, options image.RemoveOptions) ([]image.DeleteResponse, error) + NetworkList(ctx context.Context, options network.ListOptions) ([]network.Summary, error) + NetworkRemove(ctx context.Context, networkID string) error + VolumeList(ctx context.Context, options volume.ListOptions) (volume.ListResponse, error) + VolumeRemove(ctx context.Context, volumeID string, force bool) error + Ping(ctx context.Context) (types.Ping, error) + NegotiateAPIVersion(ctx context.Context) +} diff --git a/linux/Dockerfile b/linux/Dockerfile index 217be7e..cdce2c7 100644 --- a/linux/Dockerfile +++ b/linux/Dockerfile @@ -1,7 +1,7 @@ # ----------- # Build Image # ----------- -FROM golang:1.21-alpine3.19 AS build +FROM golang:1.22-alpine3.19 AS build # Go build env ENV CGO_ENABLED=0 diff --git a/main.go b/main.go index 673e2d5..6a8f591 100644 --- a/main.go +++ b/main.go @@ -1,364 +1,35 @@ +// Runs a container reaper that listens for connections and prunes resources based on the filters received. package main import ( - "bufio" "context" - "errors" - "flag" "fmt" - "io" - "log" - "net" - "net/url" + "log/slog" "os" "os/signal" - "strconv" - "strings" - "sync" "syscall" - "time" - - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/filters" - "github.com/docker/docker/client" - "gopkg.in/matryer/try.v1" -) - -const ( - connectionTimeoutEnv string = "RYUK_CONNECTION_TIMEOUT" - portEnv string = "RYUK_PORT" - reconnectionTimeoutEnv string = "RYUK_RECONNECTION_TIMEOUT" - ryukLabel string = "org.testcontainers.ryuk" - verboseEnv string = "RYUK_VERBOSE" ) -var ( - port int - connectionTimeout time.Duration - reconnectionTimeout time.Duration - verbose bool -) - -type config struct { - Port int - ConnectionTimeout time.Duration - ReconnectionTimeout time.Duration - Verbose bool -} - -// newConfig parses command line flags and returns a parsed config. config.timeout -// can be set by environment variable, RYUK_CONNECTION_TIMEOUT. If an error occurs -// while parsing RYUK_CONNECTION_TIMEOUT the error is returned. -func newConfig(args []string) (*config, error) { - cfg := config{ - Port: 8080, - ConnectionTimeout: 60 * time.Second, - ReconnectionTimeout: 10 * time.Second, - Verbose: false, - } - - fs := flag.NewFlagSet("ryuk", flag.ExitOnError) - fs.SetOutput(os.Stdout) - - fs.IntVar(&cfg.Port, "p", 8080, "Deprecated: please use the "+portEnv+" environment variable to set the port to bind at") - - err := fs.Parse(args) - if err != nil { - return nil, err - } - - if timeout, ok := os.LookupEnv(connectionTimeoutEnv); ok { - parsedTimeout, err := time.ParseDuration(timeout) - if err != nil { - return nil, fmt.Errorf("failed to parse \"%s\": %s", connectionTimeoutEnv, err) - } - - cfg.ConnectionTimeout = parsedTimeout - } - - if port, ok := os.LookupEnv(portEnv); ok { - parsedPort, err := strconv.Atoi(port) - if err != nil { - return nil, fmt.Errorf("failed to parse \"%s\": %s", portEnv, err) - } - - cfg.Port = parsedPort - } - - if timeout, ok := os.LookupEnv(reconnectionTimeoutEnv); ok { - parsedTimeout, err := time.ParseDuration(timeout) - if err != nil { - return nil, fmt.Errorf("failed to parse \"%s\": %s", reconnectionTimeoutEnv, err) - } - - cfg.ReconnectionTimeout = parsedTimeout - } - - if verbose, ok := os.LookupEnv(verboseEnv); ok { - v, err := strconv.ParseBool(verbose) - if err != nil { - return nil, fmt.Errorf("failed to parse \"%s\": %s", verboseEnv, err) - } - - cfg.Verbose = v - } - - return &cfg, nil -} - -func main() { - cfg, err := newConfig(os.Args[1:]) - if err != nil { - panic(err) - } - - port = cfg.Port - connectionTimeout = cfg.ConnectionTimeout - reconnectionTimeout = cfg.ReconnectionTimeout - verbose = cfg.Verbose - - cli, err := client.NewClientWithOpts(client.FromEnv) - if err != nil { - panic(err) - } - - cli.NegotiateAPIVersion(context.Background()) - - pingCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - log.Println("Pinging Docker...") - _, err = cli.Ping(pingCtx) - if err != nil { - panic(err) - } - - log.Println("Docker daemon is available!") - - deathNote := sync.Map{} - - // Buffered so we don't block the main process. - connectionAccepted := make(chan net.Addr, 10) - connectionLost := make(chan net.Addr, 10) - - go processRequests(&deathNote, connectionAccepted, connectionLost) - +// run creates and runs a reaper which is cancelled when a signal is received. +func run() error { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - waitForPruneCondition(ctx, connectionAccepted, connectionLost) - - dc, dn, dv, di := prune(cli, &deathNote) - log.Printf("Removed %d container(s), %d network(s), %d volume(s) %d image(s)", dc, dn, dv, di) -} - -func processRequests(deathNote *sync.Map, connectionAccepted chan<- net.Addr, connectionLost chan<- net.Addr) { - log.Printf("Starting on port %d...", port) - - ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + r, err := newReaper(ctx) if err != nil { - panic(err) + return fmt.Errorf("new reaper: %w", err) } - log.Println("Started!") - for { - conn, err := ln.Accept() - if err != nil { - panic(err) - } - - connectionAccepted <- conn.RemoteAddr() - - go func(conn net.Conn) { - defer conn.Close() - defer func() { connectionLost <- conn.RemoteAddr() }() - - reader := bufio.NewReader(conn) - for { - message, err := reader.ReadString('\n') - - message = strings.TrimSpace(message) - - if len(message) > 0 { - query, err := url.ParseQuery(message) - - if err != nil { - log.Println(err) - continue - } - - args := filters.NewArgs() - for filterType, values := range query { - for _, value := range values { - args.Add(filterType, value) - } - } - paramBytes, err := args.MarshalJSON() - - if err != nil { - log.Println(err) - continue - } - param := string(paramBytes) - - log.Printf("Adding %s", param) - - deathNote.Store(param, true) - - _, _ = conn.Write([]byte("ACK\n")) - } - - if err != nil { - if !errors.Is(err, io.EOF) { - log.Println(err) - } - return - } - } - }(conn) + if err = r.run(ctx); err != nil { + return fmt.Errorf("run: %w", err) } -} -func waitForPruneCondition(ctx context.Context, connectionAccepted <-chan net.Addr, connectionLost <-chan net.Addr) { - connectionCount := 0 - timer := time.NewTimer(connectionTimeout) - for { - select { - case addr := <-connectionAccepted: - log.Printf("New client connected: %s", addr) - connectionCount++ - if connectionCount == 1 { - if !timer.Stop() { - <-timer.C - } - } - case addr := <-connectionLost: - log.Printf("Client disconnected: %s", addr.String()) - connectionCount-- - if connectionCount == 0 { - timer.Reset(reconnectionTimeout) - } - case <-ctx.Done(): - log.Println("Signal received") - return - case <-timer.C: - log.Println("Timeout waiting for connection") - return - } - } + return nil } -func prune(cli *client.Client, deathNote *sync.Map) (deletedContainers int, deletedNetworks int, deletedVolumes int, deletedImages int) { - deletedContainersMap := make(map[string]bool) - deletedNetworksMap := make(map[string]bool) - deletedVolumesMap := make(map[string]bool) - deletedImagesMap := make(map[string]bool) - - deathNote.Range(func(note, _ interface{}) bool { - param := fmt.Sprint(note) - if verbose { - log.Printf("Deleting %s\n", param) - } - - args, err := filters.FromJSON(param) - if err != nil { - log.Println(err) - return true - } - - containerListOpts := container.ListOptions{All: true, Filters: args} - if verbose { - log.Printf("Listing containers with filter: %#v\n", containerListOpts) - } - - if containers, err := cli.ContainerList(context.Background(), containerListOpts); err != nil { - log.Println(err) - } else { - containerRemoveOpts := container.RemoveOptions{RemoveVolumes: true, Force: true} - - for _, container := range containers { - value, isReaper := container.Labels[ryukLabel] - if isReaper && value == "true" { - continue - } - - if verbose { - log.Printf("Deleting containers with filter: %#v\n", containerRemoveOpts) - } - _ = cli.ContainerRemove(context.Background(), container.ID, containerRemoveOpts) - deletedContainersMap[container.ID] = true - } - } - - _ = try.Do(func(attempt int) (bool, error) { - if verbose { - log.Printf("Deleting networks with filter: %#v. (Attempt %d/%d)\n", args, attempt, 10) - } - - networksPruneReport, err := cli.NetworksPrune(context.Background(), args) - for _, networkID := range networksPruneReport.NetworksDeleted { - deletedNetworksMap[networkID] = true - } - shouldRetry := attempt < 10 - if err != nil && shouldRetry { - log.Printf("Network pruning has failed, retrying(%d/%d). The error was: %v", attempt, 10, err) - time.Sleep(1 * time.Second) - } - return shouldRetry, err - }) - - _ = try.Do(func(attempt int) (bool, error) { - argsClone := args.Clone() - - // The API version >= v1.42 prunes only anonymous volumes: https://github.com/moby/moby/releases/tag/v23.0.0. - if serverVersion, err := cli.ServerVersion(context.Background()); err == nil && serverVersion.APIVersion >= "1.42" { - argsClone.Add("all", "true") - } - - if verbose { - log.Printf("Deleting volumes with filter: %#v. (Attempt %d/%d)\n", argsClone, attempt, 10) - } - - volumesPruneReport, err := cli.VolumesPrune(context.Background(), argsClone) - for _, volumeName := range volumesPruneReport.VolumesDeleted { - deletedVolumesMap[volumeName] = true - } - shouldRetry := attempt < 10 - if err != nil && shouldRetry { - log.Printf("Volumes pruning has failed, retrying(%d/%d). The error was: %v", attempt, 10, err) - time.Sleep(1 * time.Second) - } - return shouldRetry, err - }) - - _ = try.Do(func(attempt int) (bool, error) { - argsClone := args.Clone() - argsClone.Add("dangling", "false") - - if verbose { - log.Printf("Deleting images with filter: %#v. (Attempt %d/%d)\n", argsClone, attempt, 10) - } - - imagesPruneReport, err := cli.ImagesPrune(context.Background(), argsClone) - for _, image := range imagesPruneReport.ImagesDeleted { - if image.Untagged != "" { - deletedImagesMap[image.Untagged] = true - } - } - shouldRetry := attempt < 10 - if err != nil && shouldRetry { - log.Printf("Images pruning has failed, retrying(%d/%d). The error was: %v", attempt, 10, err) - time.Sleep(1 * time.Second) - } - return shouldRetry, err - }) - - return true - }) - - deletedContainers = len(deletedContainersMap) - deletedNetworks = len(deletedNetworksMap) - deletedVolumes = len(deletedVolumesMap) - deletedImages = len(deletedImagesMap) - return +func main() { + if err := run(); err != nil { + slog.Error("run", fieldError, err) + os.Exit(1) + } } diff --git a/main_test.go b/main_test.go deleted file mode 100644 index f5a7a9e..0000000 --- a/main_test.go +++ /dev/null @@ -1,400 +0,0 @@ -package main - -import ( - "archive/tar" - "bytes" - "context" - "fmt" - "io" - "log" - "net" - "os" - "path/filepath" - "sync" - "testing" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/volume" - "github.com/docker/docker/client" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/network" -) - -var addr = &net.TCPAddr{ - IP: net.IPv4zero, - Port: 5555, - Zone: "", -} - -var testConnectionTimeout time.Duration = 5 * time.Second - -func init() { - reconnectionTimeout = 1 * time.Second -} - -func TestReconnectionTimeout(t *testing.T) { - // reset connectionTimeout - connectionTimeout = testConnectionTimeout - - acc := make(chan net.Addr) - lost := make(chan net.Addr) - - done := make(chan struct{}) - - go func() { - waitForPruneCondition(context.Background(), acc, lost) - done <- struct{}{} - }() - - acc <- addr - lost <- addr - - select { - case <-done: - return - case <-time.After(2 * time.Second): - t.Fail() - } -} - -func TestInitialTimeout(t *testing.T) { - // reset connectionTimeout - connectionTimeout = testConnectionTimeout - - origWriter := log.Default().Writer() - defer func() { - log.SetOutput(origWriter) - }() - var buf bytes.Buffer - log.SetOutput(&buf) - - acc := make(chan net.Addr) - lost := make(chan net.Addr) - done := make(chan string) - - go func() { - waitForPruneCondition(context.Background(), acc, lost) - done <- buf.String() - }() - - select { - case p := <-done: - require.Contains(t, p, "Timeout waiting for connection") - case <-time.After(7 * time.Second): - t.Fatal("Timeout waiting prune condition") - } -} - -func TestPrune(t *testing.T) { - tcCli, err := testcontainers.NewDockerClientWithOpts(context.Background(), client.FromEnv) - tcCli.NegotiateAPIVersion(context.Background()) - - cli := tcCli.Client - - if err == nil { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - _, err = cli.Ping(ctx) - } - - if err != nil { - t.Fatal(err) - } - cli.NegotiateAPIVersion(context.Background()) - - maxLength := 25 - - t.Run("Empty death note", func(t *testing.T) { - deathNote := &sync.Map{} - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Malformed death note", func(t *testing.T) { - deathNote := &sync.Map{} - deathNote.Store("param", true) - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Malformed JSON death note", func(t *testing.T) { - deathNote := &sync.Map{} - deathNote.Store(`{"label": "color"}`, true) - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Death note removing containers", func(t *testing.T) { - const label = "removable-container" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - for i := 0; i < maxLength; i++ { - c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "nginx:alpine", - Labels: map[string]string{ - label: "true", - }, - SkipReaper: true, - }, - Started: true, - }) - require.Nil(t, err) - require.NotNil(t, c) - t.Cleanup(func() { - require.Error(t, c.Terminate(ctx), "container should have been removed") - }) - } - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, maxLength, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Death note skips reaper container itself", func(t *testing.T) { - const label = "removable-container" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "nginx:alpine", - Labels: map[string]string{ - label: "true", - "org.testcontainers.ryuk": "true", - }, - SkipReaper: true, - }, - Started: true, - }) - require.Nil(t, err) - require.NotNil(t, c) - - dc, _, _, _ := prune(cli, deathNote) - assert.Equal(t, 0, dc) - - err = c.Terminate(ctx) - require.Nil(t, err) - }) - - t.Run("Death note removing networks", func(t *testing.T) { - const label = "removable-network" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - for i := 0; i < maxLength; i++ { - nw, err := network.New(ctx, network.WithLabels(map[string]string{label: "true"})) - require.Nil(t, err) - require.NotNil(t, nw) - t.Cleanup(func() { - require.Error(t, nw.Remove(ctx), "network should have been removed") - }) - } - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, maxLength, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Death note removing volumes", func(t *testing.T) { - const label = "removable-volume" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - for i := 0; i < maxLength; i++ { - opts := volume.CreateOptions{ - Name: fmt.Sprintf("volume-%d", i), - Labels: map[string]string{ - label: "true", - }, - } - - vol, err := cli.VolumeCreate(ctx, opts) - require.Nil(t, err) - require.NotNil(t, vol) - t.Cleanup(func() { - // force remove the volume, which does not fail if the volume was already removed - require.NoError(t, cli.VolumeRemove(ctx, vol.Name, true), "volume should have been removed") - }) - } - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, maxLength, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Death note removing images", func(t *testing.T) { - const label = "removable-image" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - for i := 0; i < maxLength; i++ { - buf := new(bytes.Buffer) - tw := tar.NewWriter(buf) - defer tw.Close() - - dockerFile := "Dockerfile" - dockerFileReader, err := os.Open(filepath.Join("testresources", dockerFile)) - require.Nil(t, err) - - readDockerFile, err := io.ReadAll(dockerFileReader) - require.Nil(t, err) - - tarHeader := &tar.Header{ - Name: dockerFile, - Size: int64(len(readDockerFile)), - } - err = tw.WriteHeader(tarHeader) - require.Nil(t, err) - - _, err = tw.Write(readDockerFile) - require.Nil(t, err) - dockerFileTarReader := bytes.NewReader(buf.Bytes()) - - opt := types.ImageBuildOptions{ - Remove: true, - ForceRemove: true, // removing containers produced by the build - Labels: map[string]string{ - label: "true", - "index": fmt.Sprintf("%d", i), - }, - Context: dockerFileTarReader, - Dockerfile: dockerFile, - Tags: []string{fmt.Sprintf("moby-ryuk:test-%d", i)}, // adding a tag so that image is not marked as 'dangling' - } - - response, err := cli.ImageBuild(ctx, dockerFileTarReader, opt) - require.Nil(t, err) - require.NotNil(t, response) - - // need to read the response from Docker before continuing the execution - buf = new(bytes.Buffer) - _, err = buf.ReadFrom(response.Body) - require.Nil(t, err) - - err = response.Body.Close() - require.Nil(t, err) - } - - dc, dn, dv, di := prune(cli, deathNote) - - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, maxLength, di) - }) -} - -func Test_newConfig(t *testing.T) { - t.Run("should return an error when failing to parse RYUK_CONNECTION_TIMEOUT environment variable", func(t *testing.T) { - t.Setenv(connectionTimeoutEnv, "bad_value") - - config, err := newConfig([]string{}) - require.NotNil(t, err) - require.Nil(t, config) - }) - - t.Run("should set connectionTimeout with RYUK_CONNECTION_TIMEOUT environment variable", func(t *testing.T) { - t.Setenv(connectionTimeoutEnv, "10s") - - config, err := newConfig([]string{}) - require.Nil(t, err) - assert.Equal(t, 10*time.Second, config.ConnectionTimeout) - }) - - t.Run("should return an error when failing to parse RYUK_PORT environment variable", func(t *testing.T) { - t.Setenv(portEnv, "bad_value") - - config, err := newConfig([]string{}) - require.NotNil(t, err) - require.Nil(t, config) - }) - - t.Run("should set connectionTimeout with RYUK_PORT environment variable", func(t *testing.T) { - t.Setenv(portEnv, "8081") - - config, err := newConfig([]string{}) - require.Nil(t, err) - assert.Equal(t, 8081, config.Port) - }) - - t.Run("should return an error when failing to parse RYUK_RECONNECTION_TIMEOUT environment variable", func(t *testing.T) { - t.Setenv(reconnectionTimeoutEnv, "bad_value") - - config, err := newConfig([]string{}) - require.NotNil(t, err) - require.Nil(t, config) - }) - - t.Run("should set connectionTimeout with RYUK_RECONNECTION_TIMEOUT environment variable", func(t *testing.T) { - t.Setenv(reconnectionTimeoutEnv, "100s") - - config, err := newConfig([]string{}) - require.Nil(t, err) - assert.Equal(t, 100*time.Second, config.ReconnectionTimeout) - }) - - t.Run("should return an error when failing to parse RYUK_VERBOSE environment variable", func(t *testing.T) { - t.Setenv(verboseEnv, "bad_value") - - config, err := newConfig([]string{}) - require.NotNil(t, err) - require.Nil(t, config) - }) - - t.Run("should set verbose with RYUK_VERBOSE environment variable", func(t *testing.T) { - t.Setenv(verboseEnv, "true") - - config, err := newConfig([]string{}) - require.Nil(t, err) - assert.True(t, config.Verbose) - - t.Setenv(verboseEnv, "false") - - config, err = newConfig([]string{}) - require.Nil(t, err) - assert.False(t, config.Verbose) - }) - - t.Run("should set port with port flag", func(t *testing.T) { - config, err := newConfig([]string{"-p", "3000"}) - require.Nil(t, err) - assert.Equal(t, 3000, config.Port) - }) - - t.Run("should set port from env with port flag and RYUK_PORT environment variable", func(t *testing.T) { - t.Setenv(portEnv, "8081") - - config, err := newConfig([]string{"-p", "3000"}) - require.Nil(t, err) - assert.Equal(t, 8081, config.Port) - }) -} diff --git a/mock_test.go b/mock_test.go new file mode 100644 index 0000000..3d5a666 --- /dev/null +++ b/mock_test.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/stretchr/testify/mock" +) + +var _ dockerClient = (*mockClient)(nil) + +type mockClient struct { + mock.Mock +} + +func (c *mockClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) { + args := c.Called(ctx, options) + return args.Get(0).([]types.Container), args.Error(1) +} + +func (c *mockClient) ContainerRemove(ctx context.Context, containerID string, options container.RemoveOptions) error { + args := c.Called(ctx, containerID, options) + return args.Error(0) +} + +func (c *mockClient) ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error) { + args := c.Called(ctx, options) + return args.Get(0).([]image.Summary), args.Error(1) +} + +func (c *mockClient) ImageRemove(ctx context.Context, imageID string, options image.RemoveOptions) ([]image.DeleteResponse, error) { + args := c.Called(ctx, imageID, options) + return args.Get(0).([]image.DeleteResponse), args.Error(1) +} + +func (c *mockClient) NetworkList(ctx context.Context, options network.ListOptions) ([]network.Summary, error) { + args := c.Called(ctx, options) + return args.Get(0).([]network.Summary), args.Error(1) +} + +func (c *mockClient) NetworkRemove(ctx context.Context, networkID string) error { + args := c.Called(ctx, networkID) + return args.Error(0) +} + +func (c *mockClient) VolumeList(ctx context.Context, options volume.ListOptions) (volume.ListResponse, error) { + args := c.Called(ctx, options) + return args.Get(0).(volume.ListResponse), args.Error(1) +} + +func (c *mockClient) VolumeRemove(ctx context.Context, volumeID string, force bool) error { + args := c.Called(ctx, volumeID, force) + return args.Error(0) +} + +func (c *mockClient) Ping(ctx context.Context) (types.Ping, error) { + args := c.Called(ctx) + return args.Get(0).(types.Ping), args.Error(1) +} + +func (c *mockClient) NegotiateAPIVersion(ctx context.Context) { + c.Called(ctx) +} diff --git a/reaper.go b/reaper.go new file mode 100644 index 0000000..f8e2aeb --- /dev/null +++ b/reaper.go @@ -0,0 +1,661 @@ +package main + +import ( + "bufio" + "context" + "errors" + "fmt" + "log/slog" + "net" + "net/url" + "os" + "sync" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/client" + "github.com/docker/docker/errdefs" +) + +//nolint:gochecknoglobals // Reusable options are fine as globals. +var ( + // errChangesDetected is returned when changes are detected. + errChangesDetected = errors.New("changes detected") + + // containerRemoveOptions are the options we use to remove a container. + containerRemoveOptions = container.RemoveOptions{RemoveVolumes: true, Force: true} + + // imageRemoveOptions are the options we use to remove an image. + imageRemoveOptions = image.RemoveOptions{PruneChildren: true} + + // volumeRemoveForce is the force option we use to remove a volume. + volumeRemoveForce = true + + // ackResponse is the response we send to the client to acknowledge a filter. + ackResponse = []byte("ACK\n") +) + +// reaper listens for connections and prunes resources based on the filters received +// once a prune condition is met. +type reaper struct { + client dockerClient + listener net.Listener + cfg *config + connected chan string + disconnected chan string + shutdown chan struct{} + filters map[string]filters.Args + logger *slog.Logger + mtx sync.Mutex +} + +// reaperOption is a function that sets an option on a reaper. +type reaperOption func(*reaper) error + +// withConfig returns a reaperOption that sets the configuration. +// Default: loaded from the environment. +func withConfig(cfg config) reaperOption { + return func(r *reaper) error { + r.cfg = &cfg + return nil + } +} + +// withLogger returns a reaperOption that sets the logger. +// If specified the log level will not be changed by the +// configuration, so should be set to the desired level. +// Default: A text handler to stdout with the log level +// set by the configuration. +func withLogger(logger *slog.Logger) reaperOption { + return func(r *reaper) error { + r.logger = logger + return nil + } +} + +// withClient returns a reaperOption that sets the Docker client. +// Default: A docker client created with options from the environment. +func withClient(client dockerClient) reaperOption { + return func(r *reaper) error { + r.client = client + return nil + } +} + +// newReaper creates a new reaper with the specified options. +// Default options are used if not specified, see the individual +// options for details. +func newReaper(ctx context.Context, options ...reaperOption) (*reaper, error) { + logLevel := &slog.LevelVar{} + r := &reaper{ + filters: make(map[string]filters.Args), + connected: make(chan string), // Must be unbuffered to ensure correct behaviour. + disconnected: make(chan string), + shutdown: make(chan struct{}), + logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: logLevel, + })), + } + + for _, option := range options { + if err := option(r); err != nil { + return nil, fmt.Errorf("option: %w", err) + } + } + + var err error + if r.client == nil { + // Default client configured from the environment. + if r.client, err = client.NewClientWithOpts(client.FromEnv); err != nil { + return nil, fmt.Errorf("new client: %w", err) + } + } + + r.client.NegotiateAPIVersion(ctx) + + if r.cfg == nil { + // Default configuration loaded from the environment. + if r.cfg, err = loadConfig(); err != nil { + return nil, fmt.Errorf("load config: %w", err) + } + } + + if r.cfg.Verbose { + logLevel.Set(slog.LevelDebug) + } + + pingCtx, cancel := context.WithTimeout(ctx, r.cfg.RequestTimeout) + defer cancel() + + if _, err = r.client.Ping(pingCtx); err != nil { + return nil, fmt.Errorf("ping: %w", err) + } + + r.logger.LogAttrs(ctx, slog.LevelInfo, "starting", r.cfg.LogAttrs()...) + if r.listener, err = net.Listen("tcp", fmt.Sprintf(":%d", r.cfg.Port)); err != nil { + return nil, fmt.Errorf("listen: %w", err) + } + + r.logger.Info("listening", fieldAddress, r.listener.Addr().String()) + + return r, nil +} + +// run starts the reaper which prunes resources when: +// - Signalled by the context +// - No connections are received within the connection timeout +// - A connection is received and no further connections are received within the reconnection timeout +func (r *reaper) run(ctx context.Context) error { + defer r.logger.Info("done") + + // Process incoming connections. + go r.processClients() + + // Wait for all tasks to complete. + if err := r.pruner(ctx); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + + return err + } + + return nil +} + +// pruner waits for a prune condition to be triggered then runs a prune. +func (r *reaper) pruner(ctx context.Context) error { + var errs []error + resources, err := r.pruneWait(ctx) + if err != nil { + errs = append(errs, fmt.Errorf("prune wait: %w", err)) + } + + if err = r.prune(resources); err != nil { //nolint:contextcheck // Prune needs its own context to ensure clean up completes. + errs = append(errs, fmt.Errorf("prune: %w", err)) + } + + return errors.Join(errs...) +} + +// processClients listens for incoming connections and processes them. +func (r *reaper) processClients() { + r.logger.Info("client processing started") + defer r.logger.Info("client processing stopped") + + for { + conn, err := r.listener.Accept() + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, net.ErrClosed) { + return + } + + r.logger.Error("accept", fieldError, err) + continue + } + + // Block waiting for the connection to be registered + // so that we prevent the race on connection count. + addr := conn.RemoteAddr().String() + select { + case r.connected <- addr: + case <-r.shutdown: + // We received a new connection after shutdown started. + // Closing without returning the ACK should trigger the caller + // to retry and get a new reaper. + r.logger.Warn("shutdown, aborting client", fieldAddress, addr) + conn.Close() + return + } + + go r.handle(conn) + } +} + +// handle processes a connection, reading session details from +// the client and adding them to our filter. +func (r *reaper) handle(conn net.Conn) { + addr := conn.RemoteAddr().String() + defer func() { + conn.Close() + r.disconnected <- addr + }() + + logger := r.logger.With(fieldAddress, addr) + + // Read filters from the client and add them to our list. + scanner := bufio.NewScanner(conn) + for scanner.Scan() { + msg := scanner.Text() + + if len(msg) == 0 { + logger.Warn("empty filter received") + continue + } + + if err := r.addFilter(msg); err != nil { + logger.Error("add filter", fieldError, err) + } + + if _, err := conn.Write(ackResponse); err != nil { + logger.Error("ack write", fieldError, err) + } + } + + if err := scanner.Err(); err != nil { + logger.Error("scan", fieldError, err) + } +} + +// resources represents the resources to prune. +type resources struct { + containers []string + networks []string + volumes []string + images []string +} + +// shutdownListener ensures that the listener is shutdown and no new clients +// are accepted. +func (r *reaper) shutdownListener() { + select { + case <-r.shutdown: + return // Already shutdown. + default: + close(r.shutdown) + r.listener.Close() + } +} + +// pruneWait waits for a prune condition to be met and returns the resources to prune. +// It will retry if changes are detected. +func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { + defer r.shutdownListener() + + clients := 0 + pruneCheck := time.NewTicker(r.cfg.ConnectionTimeout) + done := ctx.Done() + for { + select { + case addr := <-r.connected: + clients++ + r.logger.Info("client connected", fieldAddress, addr, fieldClients, clients) + if clients == 1 { + pruneCheck.Stop() + } + case addr := <-r.disconnected: + clients-- + r.logger.Info("client disconnected", fieldAddress, addr, fieldClients, clients) + if clients == 0 && done != nil { + pruneCheck.Reset(r.cfg.ReconnectionTimeout) + } + case <-done: + r.logger.Info("signal received", fieldClients, clients, "shutdown_timeout", r.cfg.ShutdownTimeout) + // Force shutdown by closing the listener, scheduling + // a pruneCheck after shutdown timeout and setting done + // to nil so we don't enter this case again. + r.shutdownListener() + pruneCheck.Reset(r.cfg.ShutdownTimeout) + done = nil + case now := <-pruneCheck.C: + r.logger.Info("prune check", fieldClients, clients) + + if clients > 0 { + r.logger.Warn("shutdown timeout", fieldClients, clients) + } + + resources, err := r.resources(now.Add(r.cfg.RetryOffset)) //nolint:contextcheck // Needs its own context to ensure clean up completes. + if err != nil { + if errors.Is(err, errChangesDetected) { + r.logger.Warn("change detected, waiting again", fieldError, err) + continue + } + + return resources, fmt.Errorf("resources: %w", err) + } + + return resources, nil + } + } +} + +// resources returns the resources that match the collected filters. +func (r *reaper) resources(since time.Time) (*resources, error) { + var ret resources + var err error + var errs []error + // We combine errors so we can do best effort removal. + for _, args := range r.filterArgs() { + if ret.containers, err = r.affectedContainers(since, args); err != nil { + if !errors.Is(err, errChangesDetected) { + r.logger.Error("affected containers", fieldError, err) + } + errs = append(errs, fmt.Errorf("affected containers: %w", err)) + } + + if ret.networks, err = r.affectedNetworks(since, args); err != nil { + if !errors.Is(err, errChangesDetected) { + r.logger.Error("affected networks", fieldError, err) + } + errs = append(errs, fmt.Errorf("affected networks: %w", err)) + } + + if ret.volumes, err = r.affectedVolumes(since, args); err != nil { + if !errors.Is(err, errChangesDetected) { + r.logger.Error("affected volumes", fieldError, err) + } + errs = append(errs, fmt.Errorf("affected volumes: %w", err)) + } + + if ret.images, err = r.affectedImages(since, args); err != nil { + if !errors.Is(err, errChangesDetected) { + r.logger.Error("affected images", fieldError, err) + } + errs = append(errs, fmt.Errorf("affected images: %w", err)) + } + } + + return &ret, errors.Join(errs...) +} + +// affectedContainers returns a slice of container IDs that match the filters. +// If a matching container was created after since, an error is returned. +func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + options := container.ListOptions{All: true, Filters: args} + r.logger.Debug("listing containers", "filter", options) + containers, err := r.client.ContainerList(ctx, options) + if err != nil { + return nil, fmt.Errorf("container list: %w", err) + } + + containerIDs := make([]string, 0, len(containers)) + for _, container := range containers { + if container.Labels[ryukLabel] == "true" { + // Ignore reaper containers. + r.logger.Debug("skipping reaper container", "id", container.ID) + continue + } + + created := time.Unix(container.Created, 0) + changed := created.After(since) + + r.logger.Debug("found container", + "id", container.ID, + "image", container.Image, + "names", container.Names, + "ports", container.Ports, + "state", container.State, + "labels", container.Labels, + "created", created, + "changed", changed, + "since", since, + ) + + if changed { + // Its not safe to remove a container which was created after + // the prune was initiated, as this may lead to unexpected behaviour. + return nil, fmt.Errorf("container %s: %w", container.ID, errChangesDetected) + } + + containerIDs = append(containerIDs, container.ID) + } + + return containerIDs, nil +} + +// affectedNetworks returns a list of network IDs that match the filters. +// If a matching network was created after since, an error is returned. +func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + options := network.ListOptions{Filters: args} + r.logger.Debug("listing networks", "options", options) + report, err := r.client.NetworkList(ctx, options) + if err != nil { + return nil, fmt.Errorf("network list: %w", err) + } + + networks := make([]string, 0, len(report)) + for _, network := range report { + changed := network.Created.After(since) + r.logger.Debug("found network", + "id", network.ID, + "created", network.Created, + "changed", changed, + "since", since, + ) + + if changed { + // Its not safe to remove a network which was created after + // the prune was initiated, as this may lead to unexpected behaviour. + return nil, fmt.Errorf("network %s: %w", network.ID, errChangesDetected) + } + + networks = append(networks, network.ID) + } + + return networks, nil +} + +// affectedVolumes returns a list of volume names that match the filters. +// If a matching volume was created after since, an error is returned. +func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + options := volume.ListOptions{Filters: args} + r.logger.Debug("listing volumes", "filter", options) + report, err := r.client.VolumeList(ctx, options) + if err != nil { + return nil, fmt.Errorf("volume list: %w", err) + } + + volumes := make([]string, 0, len(report.Volumes)) + for _, volume := range report.Volumes { + created, perr := time.Parse(time.RFC3339, volume.CreatedAt) + if perr != nil { + // Best effort, log and continue. + r.logger.Error("parse volume created", fieldError, perr, "volume", volume.Name) + continue + } + + changed := created.After(since) + r.logger.Debug("found volume", + "name", volume.Name, + "created", created, + "changed", changed, + "since", since, + ) + + if changed { + // Its not safe to remove a volume which was created after + // the prune was initiated, as this may lead to unexpected behaviour. + return nil, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected) + } + + volumes = append(volumes, volume.Name) + } + + return volumes, nil +} + +// affectedImages returns a list of image IDs that match the filters. +// If a matching volume was created after since, an error is returned. +func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + options := image.ListOptions{Filters: args} + r.logger.Debug("listing images", "filter", options) + report, err := r.client.ImageList(ctx, options) + if err != nil { + return nil, fmt.Errorf("image list: %w", err) + } + + images := make([]string, 0, len(report)) + for _, image := range report { + created := time.Unix(image.Created, 0) + changed := created.After(since) + r.logger.Debug("found image", + "id", image.ID, + "created", created, + "changed", changed, + "since", since, + ) + + if changed { + // Its not safe to remove an image which was created after + // the prune was initiated, as this may lead to unexpected behaviour. + return nil, fmt.Errorf("image %s: %w", image.ID, errChangesDetected) + } + + images = append(images, image.ID) + } + + return images, nil +} + +// addFilter adds a filter to prune. +// Safe to call concurrently. +func (r *reaper) addFilter(msg string) error { + query, err := url.ParseQuery(msg) + if err != nil { + return fmt.Errorf("parse query: %w", err) + } + + args := filters.NewArgs() + for filterType, values := range query { + for _, value := range values { + args.Add(filterType, value) + } + } + + // We can't use msg as it could be in any order. + data, err := args.MarshalJSON() + if err != nil { + return fmt.Errorf("marshal json: %w", err) + } + + key := string(data) + + r.mtx.Lock() + defer r.mtx.Unlock() + + if _, ok := r.filters[key]; ok { + r.logger.Debug("filter already exists", "key", key) + return nil + } + + r.logger.Debug("adding filter", "args", args, "key", key) + r.filters[key] = args + + return nil +} + +// filterArgs returns a slice of filter.Args to check against. +// Safe to call concurrently. +func (r *reaper) filterArgs() []filters.Args { + r.mtx.Lock() + defer r.mtx.Unlock() + + filters := make([]filters.Args, 0, len(r.filters)) + for _, args := range r.filters { + filters = append(filters, args) + } + + return filters +} + +// prune removes the specified resources. +func (r *reaper) prune(resources *resources) error { + var containers, networks, volumes, images int + var errs []error + + // Containers must be removed first. + errs = append(errs, r.remove("container", resources.containers, &containers, func(ctx context.Context, id string) error { + return r.client.ContainerRemove(ctx, id, containerRemoveOptions) + })) + + // Networks. + errs = append(errs, r.remove("network", resources.networks, &networks, func(ctx context.Context, id string) error { + return r.client.NetworkRemove(ctx, id) + })) + + // Volumes. + errs = append(errs, r.remove("volume", resources.volumes, &volumes, func(ctx context.Context, id string) error { + return r.client.VolumeRemove(ctx, id, volumeRemoveForce) + })) + + // Images. + errs = append(errs, r.remove("image", resources.images, &images, func(ctx context.Context, id string) error { + _, err := r.client.ImageRemove(ctx, id, imageRemoveOptions) + return err //nolint:wrapcheck // Wrapped by action. + })) + + r.logger.Info("removed", "containers", containers, "networks", networks, "volumes", volumes, "images", images) + + return errors.Join(errs...) +} + +// remove calls fn for each resource in resources and retries if necessary. +// Count is incremented for each resource that is successfully removed. +func (r *reaper) remove(resourceType string, resources []string, count *int, fn func(ctx context.Context, id string) error) error { + logger := r.logger.With("resource", resourceType) + logger.Debug("removing", "count", len(resources)) + + if len(resources) == 0 { + return nil + } + + todo := make(map[string]struct{}, len(resources)) + for _, id := range resources { + todo[id] = struct{}{} + } + + for attempt := 1; attempt <= r.cfg.RemoveRetries; attempt++ { + var retry bool + for id := range todo { + itemLogger := logger.With("id", id, "attempt", attempt) + + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + itemLogger.Debug("remove") + if err := fn(ctx, id); err != nil { + if errdefs.IsNotFound(err) { + // Already removed. + itemLogger.Debug("not found") + continue + } + + itemLogger.Error("remove", fieldError, err) + retry = true + continue + } + + delete(todo, id) + *count++ + } + + if retry { + if attempt < r.cfg.RemoveRetries { + time.Sleep(time.Second) + } + continue + } + + // All items were removed. + return nil + } + + // Some items were not removed. + return fmt.Errorf("%s left %d items", resourceType, len(todo)) +} diff --git a/reaper_test.go b/reaper_test.go new file mode 100644 index 0000000..b3d894d --- /dev/null +++ b/reaper_test.go @@ -0,0 +1,521 @@ +package main + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "net" + "strings" + "syscall" + "testing" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/errdefs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +const ( + containerID1 = "container1" + containerID2 = "container2" + networkID1 = "network1" + networkID2 = "network2" + volumeName1 = "volume1" + volumeName2 = "volume2" + imageID1 = "image1" + imageID2 = "image2" +) + +var ( + // testConfig is a config used for testing. + testConfig = withConfig(config{ + Port: 0, + ConnectionTimeout: time.Millisecond * 500, + ReconnectionTimeout: time.Millisecond * 100, + RequestTimeout: time.Millisecond * 50, + ShutdownTimeout: time.Second * 2, + RemoveRetries: 1, + RetryOffset: -time.Second * 2, + Verbose: true, + }) + + // discardLogger is a logger that discards all logs. + discardLogger = withLogger(slog.New(slog.NewTextHandler(io.Discard, nil))) + + // testLabels is a set of test labels. + testLabels = map[string]string{ + labelBase: "true", + labelBase + ".sessionID": "test-session", + labelBase + ".version": "0.1.0", + } + + // mockContext is a matcher that matches any context. + mockContext = mock.MatchedBy(func(context.Context) bool { return true }) + + // errNotFound is a docker not found error. + errNotFound = errdefs.NotFound(errors.New("not found")) +) + +func Test_newReaper(t *testing.T) { + ctx := context.Background() + t.Run("basic", func(t *testing.T) { + r, err := newReaper(ctx, discardLogger, testConfig) + require.NoError(t, err) + require.NotNil(t, r) + }) + + t.Run("with-config", func(t *testing.T) { + r, err := newReaper(ctx, discardLogger, testConfig) + require.NoError(t, err) + require.NotNil(t, r) + }) + + t.Run("bad-config", func(t *testing.T) { + r, err := newReaper(ctx, discardLogger, withConfig(config{})) + require.Error(t, err) + require.Nil(t, r) + }) + + t.Run("with-client", func(t *testing.T) { + client := &mockClient{} + client.On("Ping", mockContext).Return(types.Ping{}, nil) + client.On("NegotiateAPIVersion", mockContext).Return() + r, err := newReaper(ctx, discardLogger, testConfig, withClient(client)) + require.NoError(t, err) + require.NotNil(t, r) + }) +} + +// testConnect connects to the given endpoint, sends filter labels, +// and expects an ACK. The connection is closed when the context is done. +func testConnect(ctx context.Context, t *testing.T, endpoint string) { + t.Helper() + + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", endpoint) + require.NoError(t, err) + + labelFilters := make([]string, 0, len(testLabels)) + for l, v := range testLabels { + labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) + } + + _, err = conn.Write([]byte(strings.Join(labelFilters, "&") + "\n")) + require.NoError(t, err) + + buf := make([]byte, 4) + n, err := conn.Read(buf) + require.NoError(t, err) + require.Equal(t, "ACK\n", string(buf[:n])) + + go func() { + defer conn.Close() + <-ctx.Done() + }() +} + +// runTest is a test case for the reaper run method. +type runTest struct { + createdAt1 time.Time + pingErr error + + containerListErr error + containerRemoveErr1 error + containerRemoveErr2 error + containerCreated2 time.Time + + networkListErr error + networkRemoveErr1 error + networkRemoveErr2 error + networkCreated2 time.Time + + volumeListErr error + volumeRemoveErr1 error + volumeRemoveErr2 error + volumeCreated2 time.Time + + imageListErr error + imageRemoveErr1 error + imageRemoveErr2 error + imageCreated2 time.Time +} + +// newRunTest returns a new runTest with created at times set in the past. +func newRunTest() *runTest { + now := time.Now().Add(-time.Minute) + return &runTest{ + createdAt1: now, + containerCreated2: now, + networkCreated2: now, + volumeCreated2: now, + imageCreated2: now, + } +} + +// newMockClient returns a new mock client for the given test case. +func newMockClient(tc *runTest) *mockClient { + client := &mockClient{} + client.On("Ping", mock.Anything).Return(types.Ping{}, tc.pingErr) + client.On("NegotiateAPIVersion", mockContext).Return() + + // Mock the container list and remove calls. + client.On("ContainerList", mockContext, mock.Anything).Return([]types.Container{ + { + ID: containerID1, + Created: tc.createdAt1.Unix(), + Image: "testcontainers/test1:latest", + Names: []string{"test1"}, + Ports: []types.Port{{ + PrivatePort: 1001, + PublicPort: 8081, + Type: "tcp", + }}, + State: "running", + Labels: testLabels, + }, + { + ID: containerID2, + Created: tc.containerCreated2.Unix(), + Image: "testcontainers/test2:latest", + Names: []string{"test2"}, + Ports: []types.Port{{ + PrivatePort: 1002, + PublicPort: 8082, + Type: "tcp", + }}, + State: "running", + Labels: testLabels, + }, + }, tc.containerListErr) + + client.On("ContainerRemove", mockContext, containerID1, containerRemoveOptions). + Return(tc.containerRemoveErr1) + client.On("ContainerRemove", mockContext, containerID2, containerRemoveOptions). + Return(tc.containerRemoveErr2) + + // Mock the network list and remove calls. + client.On("NetworkList", mockContext, mock.Anything). + Return([]network.Summary{ + {ID: networkID1, Created: tc.createdAt1}, + {ID: networkID2, Created: tc.networkCreated2}, + }, tc.networkListErr) + client.On("NetworkRemove", mockContext, networkID1). + Return(tc.networkRemoveErr1) + client.On("NetworkRemove", mockContext, networkID2). + Return(tc.networkRemoveErr2) + + // Mock the volume list and remove calls. + client.On("VolumeList", mockContext, mock.Anything). + Return(volume.ListResponse{ + Volumes: []*volume.Volume{ + {Name: volumeName1, CreatedAt: tc.createdAt1.Format(time.RFC3339)}, + {Name: volumeName2, CreatedAt: tc.volumeCreated2.Format(time.RFC3339)}, + }, + }, tc.volumeListErr) + client.On("VolumeRemove", mockContext, volumeName1, volumeRemoveForce). + Return(tc.volumeRemoveErr1) + client.On("VolumeRemove", mockContext, volumeName2, volumeRemoveForce). + Return(tc.volumeRemoveErr2) + + // Mock the image list and remove calls. + client.On("ImageList", mockContext, mock.Anything).Return([]image.Summary{ + {ID: imageID1, Created: tc.createdAt1.Unix()}, + {ID: imageID2, Created: tc.imageCreated2.Unix()}, + }, tc.imageListErr) + client.On("ImageRemove", mockContext, imageID1, imageRemoveOptions). + Return([]image.DeleteResponse{{Deleted: imageID1}}, tc.imageRemoveErr1) + client.On("ImageRemove", mockContext, imageID2, imageRemoveOptions). + Return([]image.DeleteResponse{{Deleted: imageID2}}, tc.imageRemoveErr2) + + return client +} + +// testReaperRun runs the reaper with the given test case and returns the log output. +func testReaperRun(t *testing.T, tc *runTest) (string, error) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + var buf bytes.Buffer + logger := withLogger(slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + client := newMockClient(tc) + r, err := newReaper(ctx, logger, withClient(client), testConfig) + require.NoError(t, err) + + errCh := make(chan error, 1) + go func() { + errCh <- r.run(ctx) + }() + + clientCtx, clientCancel := context.WithTimeout(ctx, time.Millisecond*500) + t.Cleanup(clientCancel) + + addr := r.listener.Addr().String() + testConnect(clientCtx, t, addr) + testConnect(clientCtx, t, addr) + + select { + case err = <-errCh: + case <-ctx.Done(): + t.Fatal("timeout", buf.String()) + } + + // Standard checks for basic functionality. + log := buf.String() + require.Contains(t, log, "listening address="+addr) + require.Contains(t, log, "client connected") + require.Contains(t, log, "adding filter") + + return log, err +} + +func Test_newReaper_Run(t *testing.T) { + t.Run("end-to-end", func(t *testing.T) { + tc := newRunTest() + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("container-created", func(t *testing.T) { + tc := newRunTest() + tc.containerCreated2 = time.Now().Add(time.Millisecond * 200) + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.Contains(t, log, `msg="change detected, waiting again" error="affected containers: container container2: changes detected"`) + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("network-created", func(t *testing.T) { + tc := newRunTest() + tc.networkCreated2 = time.Now().Add(time.Millisecond * 200) + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.Contains(t, log, `msg="change detected, waiting again" error="affected networks: network network2: changes detected"`) + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("volume-created", func(t *testing.T) { + tc := newRunTest() + tc.volumeCreated2 = time.Now().Add(time.Millisecond * 200) + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.Contains(t, log, `msg="change detected, waiting again" error="affected volumes: volume volume2: changes detected"`) + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("image-created", func(t *testing.T) { + tc := newRunTest() + tc.imageCreated2 = time.Now().Add(time.Millisecond * 200) + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.Contains(t, log, `msg="change detected, waiting again" error="affected images: image image2: changes detected"`) + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("not-found", func(t *testing.T) { + tc := newRunTest() + tc.containerRemoveErr1 = errNotFound + tc.networkRemoveErr1 = errNotFound + tc.volumeRemoveErr1 = errNotFound + tc.imageRemoveErr1 = errNotFound + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=1 networks=1 volumes=1 images=1") + }) + + t.Run("container-remove-error", func(t *testing.T) { + tc := newRunTest() + tc.containerRemoveErr1 = errors.New("remove error") + log, err := testReaperRun(t, tc) + require.EqualError(t, err, "prune: container left 1 items") + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=1 networks=2 volumes=2 images=2") + }) + + t.Run("network-remove-error", func(t *testing.T) { + tc := newRunTest() + tc.networkRemoveErr1 = errors.New("remove error") + log, err := testReaperRun(t, tc) + require.EqualError(t, err, "prune: network left 1 items") + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=1 volumes=2 images=2") + }) + + t.Run("volume-remove-error", func(t *testing.T) { + tc := newRunTest() + tc.volumeRemoveErr1 = errors.New("remove error") + log, err := testReaperRun(t, tc) + require.EqualError(t, err, "prune: volume left 1 items") + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=1 images=2") + }) + + t.Run("image-remove-error", func(t *testing.T) { + tc := newRunTest() + tc.imageRemoveErr1 = errors.New("remove error") + log, err := testReaperRun(t, tc) + require.EqualError(t, err, "prune: image left 1 items") + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=1") + }) + + t.Run("container-list-error", func(t *testing.T) { + tc := newRunTest() + tc.containerListErr = errors.New("list error") + log, err := testReaperRun(t, tc) + require.Error(t, err) + require.Contains(t, err.Error(), "container list: "+tc.containerListErr.Error()) + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=0 networks=2 volumes=2 images=2") + }) + + t.Run("network-list-error", func(t *testing.T) { + tc := newRunTest() + tc.networkListErr = errors.New("list error") + log, err := testReaperRun(t, tc) + require.Error(t, err) + require.Contains(t, err.Error(), "network list: "+tc.networkListErr.Error()) + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=0 volumes=2 images=2") + }) + + t.Run("volume-list-error", func(t *testing.T) { + tc := newRunTest() + tc.volumeListErr = errors.New("list error") + log, err := testReaperRun(t, tc) + require.Error(t, err) + require.Contains(t, err.Error(), "volume list: "+tc.volumeListErr.Error()) + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=0 images=2") + }) + + t.Run("image-list-error", func(t *testing.T) { + tc := newRunTest() + tc.imageListErr = errors.New("list error") + log, err := testReaperRun(t, tc) + require.Error(t, err) + require.Contains(t, err.Error(), "image list: "+tc.imageListErr.Error()) + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=0") + }) +} + +func TestAbortedClient(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + var log bytes.Buffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + tc := newRunTest() + client := newMockClient(tc) + r, err := newReaper(ctx, logger, withClient(client), testConfig) + require.NoError(t, err) + + // Start processing clients. + go r.processClients() + + // Fake a shutdown to trigger the client abort. + close(r.shutdown) + + addr := r.listener.Addr().String() + + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", addr) + require.NoError(t, err) + + // With shutdown triggered the client should be aborted. + // The write will still succeed due to buffering but the read will fail. + _, err = conn.Write([]byte("some-filter\n")) + require.NoError(t, err) + + buf := make([]byte, 4) + n, err := conn.Read(buf) + require.Error(t, err) + switch { + case errors.Is(err, io.EOF), + errors.Is(err, syscall.ECONNRESET): + // Expected errors. + default: + t.Fatal("unexpected read error:", err) + } + require.Zero(t, n) + require.Contains(t, log.String(), "shutdown, aborting client") +} + +func TestShutdownTimeout(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + var log bytes.Buffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + tc := newRunTest() + client := newMockClient(tc) + r, err := newReaper(ctx, logger, withClient(client), testConfig) + require.NoError(t, err) + + errCh := make(chan error, 1) + runCtx, runCancel := context.WithCancel(ctx) + t.Cleanup(runCancel) + go func() { + errCh <- r.run(runCtx) + }() + + require.NoError(t, err) + + testConnect(ctx, t, r.listener.Addr().String()) + runCancel() + + select { + case err = <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + t.Fatal("timeout", log.String()) + } + + require.Contains(t, log.String(), "shutdown timeout") +} diff --git a/windows/Dockerfile b/windows/Dockerfile index 346468d..ded9c80 100644 --- a/windows/Dockerfile +++ b/windows/Dockerfile @@ -2,7 +2,7 @@ ARG BASE_IMAGE # ----------- # Build Image # ----------- -FROM golang:1.21-nanoserver AS build +FROM golang:1.22-nanoserver AS build # Go build env ENV CGO_ENABLED=0