Skip to content

Commit

Permalink
fix(reaper): refactor to allow retries and fix races
Browse files Browse the repository at this point in the history
Refactor how the reaper is created to allow for proper retries of
temporary errors such as container not found issues during startup or
shutdown races.

This eliminates the use of sync.Once which wasn't solving the problem at
hand and replaces it with a singleton spawner with locked access.

Wrap reaper errors so we can determine the cause of failures more
easily.

Fix race condition in port wait when loading from container by always
waiting for the port first.

Remove unnecessary use of buffering and invalid retry logic in reaper
connection handling which could never recover correctly from a partial
read.

Move the reaper creation just before connections are established in
compose to ensure its still running when the Connect calls are made.

Previously the reaper was requested in NewDockerComposeWith which
means it could have already shutdown before connections are made
during the later sections of Up if the startup took over 1 minute.

This was causing consistent failures for:
TestDockerComposeAPIWithWaitLogStrategy

Ensure that resource labels are correct so that resources aren't reaped
when the reaper is disabled by excluding session id when reaper is
disabled.

Error when creating a reaper when the config says it's disabled so that
we avoid hard to debug issues because a reaper is running when it
shouldn't be.
  • Loading branch information
stevenh committed Sep 12, 2024
1 parent b60497e commit 5de803c
Show file tree
Hide file tree
Showing 9 changed files with 842 additions and 685 deletions.
2 changes: 1 addition & 1 deletion container.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (c *ContainerRequest) BuildOptions() (types.ImageBuildOptions, error) {
}

if !c.ShouldKeepBuiltImage() {
buildOptions.Labels = core.DefaultLabels(core.SessionID())
buildOptions.Labels = GenericLabels()
}

// Do this as late as possible to ensure we don't leak the context on error/panic.
Expand Down
102 changes: 51 additions & 51 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *DockerContainer) Inspect(ctx context.Context) (*types.ContainerJSON, er
func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) {
inspect, err := c.Inspect(ctx)
if err != nil {
return "", err
return "", fmt.Errorf("inspect: %w", err)
}
if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" {
return port, nil
Expand All @@ -199,7 +199,7 @@ func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Po
return nat.NewPort(k.Proto(), p[0].HostPort)
}

return "", errors.New("port not found")
return "", errdefs.NotFound(fmt.Errorf("port %q not found", port))
}

// Deprecated: use c.Inspect(ctx).NetworkSettings.Ports instead.
Expand Down Expand Up @@ -967,9 +967,7 @@ func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (st
}

// CreateContainer fulfils a request for a container without starting it
func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
var err error

func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error checking.
// defer the close of the Docker client connection the soonest
defer p.Close()

Expand Down Expand Up @@ -1014,22 +1012,23 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
// the reaper does not need to start a reaper for itself
isReaperContainer := strings.HasSuffix(imageName, config.ReaperDefaultImage)
if !p.config.RyukDisabled && !isReaperContainer {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p)
if err != nil {
return nil, fmt.Errorf("%w: creating reaper failed", err)
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}
}

// Cleanup on error, otherwise set termSignal to nil before successful return.
defer func() {
if termSignal != nil {
termSignal <- true
}
}()
// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

if err = req.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -1095,10 +1094,9 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

if !isReaperContainer {
// add the labels that the reaper will use to terminate the container to the request
for k, v := range core.DefaultLabels(core.SessionID()) {
req.Labels[k] = v
}
// Add the labels that identify this as a testcontainers container and
// allow the reaper to terminate it if requested.
AddGenericLabels(req.Labels)
}

dockerInput := &container.Config{
Expand Down Expand Up @@ -1192,9 +1190,6 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
return nil, err
}

// Disable cleanup on success
termSignal = nil

return c, nil
}

Expand Down Expand Up @@ -1243,7 +1238,7 @@ func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string)
)
}

func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error check.
c, err := p.findContainerByName(ctx, req.Name)
if err != nil {
return nil, err
Expand All @@ -1266,14 +1261,22 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain

var termSignal chan bool
if !p.config.RyukDisabled {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
if err != nil {
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}

// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

// default hooks include logger hook and pre-create hook
Expand Down Expand Up @@ -1441,9 +1444,7 @@ func daemonHost(ctx context.Context, p *DockerProvider) (string, error) {

// Deprecated: use network.New instead
// CreateNetwork returns the object representing a new network identified by its name
func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (Network, error) {
var err error

func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (net Network, err error) { //nolint:nonamedreturns // Needed for error check.
// defer the close of the Docker client connection the soonest
defer p.Close()

Expand Down Expand Up @@ -1472,31 +1473,30 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)

var termSignal chan bool
if !p.config.RyukDisabled {
r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p)
if err != nil {
return nil, fmt.Errorf("%w: creating network reaper failed", err)
return nil, fmt.Errorf("reaper: %w", err)
}
termSignal, err = r.Connect()

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("%w: connecting to network reaper failed", err)
return nil, fmt.Errorf("reaper connect: %w", err)
}
}

// add the labels that the reaper will use to terminate the network to the request
for k, v := range core.DefaultLabels(sessionID) {
req.Labels[k] = v
// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

// Cleanup on error, otherwise set termSignal to nil before successful return.
defer func() {
if termSignal != nil {
termSignal <- true
}
}()
// add the labels that the reaper will use to terminate the network to the request
core.AddDefaultLabels(sessionID, req.Labels)

response, err := p.client.NetworkCreate(ctx, req.Name, nc)
if err != nil {
return &DockerNetwork{}, err
return &DockerNetwork{}, fmt.Errorf("create network: %w", err)
}

n := &DockerNetwork{
Expand All @@ -1507,9 +1507,6 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)
provider: p,
}

// Disable cleanup on success
termSignal = nil

return n, nil
}

Expand Down Expand Up @@ -1579,9 +1576,12 @@ func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APICl
_, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: core.DefaultLabels(core.SessionID()),
Labels: GenericLabels(),
})
if err != nil {
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", err
}
}
Expand Down Expand Up @@ -1619,7 +1619,7 @@ func containerFromDockerResponse(ctx context.Context, response types.Container)
// populate the raw representation of the container
jsonRaw, err := ctr.inspectRawContainer(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("inspect raw container: %w", err)
}

// the health status of the container, if any
Expand Down
4 changes: 1 addition & 3 deletions docker_mounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ func mapToDockerMounts(containerMounts ContainerMounts) []mount.Mount {
Labels: make(map[string]string),
}
}
for k, v := range GenericLabels() {
containerMount.VolumeOptions.Labels[k] = v
}
AddGenericLabels(containerMount.VolumeOptions.Labels)
}

mounts = append(mounts, containerMount)
Expand Down
12 changes: 11 additions & 1 deletion generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,17 @@ type GenericProvider interface {
ImageProvider
}

// GenericLabels returns a map of labels that can be used to identify containers created by this library
// GenericLabels returns a map of labels that can be used to identify resources
// created by this library. This includes the standard LabelSessionID if the
// reaper is enabled, otherwise this is excluded to prevent resources being
// incorrectly reaped.
func GenericLabels() map[string]string {
return core.DefaultLabels(core.SessionID())
}

// AddGenericLabels adds the generic labels to target.
func AddGenericLabels(target map[string]string) {
for k, v := range GenericLabels() {
target[k] = v
}
}
25 changes: 20 additions & 5 deletions internal/core/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"github.com/testcontainers/testcontainers-go/internal"
"github.com/testcontainers/testcontainers-go/internal/config"
)

const (
Expand All @@ -13,11 +14,25 @@ const (
LabelVersion = LabelBase + ".version"
)

// DefaultLabels returns the standard set of labels which
// includes LabelSessionID if the reaper is enabled.
func DefaultLabels(sessionID string) map[string]string {
return map[string]string{
LabelBase: "true",
LabelLang: "go",
LabelSessionID: sessionID,
LabelVersion: internal.Version,
labels := map[string]string{
LabelBase: "true",
LabelLang: "go",
LabelVersion: internal.Version,
}

if !config.Read().RyukDisabled {
labels[LabelSessionID] = sessionID
}

return labels
}

// AddDefaultLabels adds the default labels for sessionID to target.
func AddDefaultLabels(sessionID string, target map[string]string) {
for k, v := range DefaultLabels(sessionID) {
target[k] = v
}
}
19 changes: 0 additions & 19 deletions modules/compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package compose
import (
"context"
"errors"
"fmt"
"io"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -153,23 +152,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) {
return nil, fmt.Errorf("initialize docker client: %w", err)
}

reaperProvider, err := testcontainers.NewDockerProvider()
if err != nil {
return nil, fmt.Errorf("failed to create reaper provider for compose: %w", err)
}

var composeReaper *testcontainers.Reaper
if !reaperProvider.Config().Config.RyukDisabled {
// NewReaper is deprecated: we need to find a way to create the reaper for compose
// bypassing the deprecation.
r, err := testcontainers.NewReaper(context.Background(), testcontainers.SessionID(), reaperProvider, "")
if err != nil {
return nil, fmt.Errorf("failed to create reaper for compose: %w", err)
}

composeReaper = r
}

composeAPI := &dockerCompose{
name: composeOptions.Identifier,
configs: composeOptions.Paths,
Expand All @@ -182,7 +164,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) {
containers: make(map[string]*testcontainers.DockerContainer),
networks: make(map[string]*testcontainers.DockerNetwork),
sessionID: testcontainers.SessionID(),
reaper: composeReaper,
}

return composeAPI, nil
Expand Down
Loading

0 comments on commit 5de803c

Please sign in to comment.