From c2dc1c58dda2c926bf1b8acb35944202bcee8c8c Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Mon, 8 May 2023 13:17:10 -0500 Subject: [PATCH] full task cleanup when alloc prerun hook fails (#17104) to avoid leaking task resources (e.g. containers, iptables) if allocRunner prerun fails during restore on client restart. now if prerun fails, TaskRunner.MarkFailedKill() will only emit an event, mark the task as failed, and cancel the tr's killCtx, so then ar.runTasks() -> tr.Run() can take care of the actual cleanup. removed from (formerly) tr.MarkFailedDead(), now handled by tr.Run(): * set task state as dead * save task runner local state * task stop hooks also done in tr.Run() now that it's not skipped: * handleKill() to kill tasks while respecting their shutdown delay, and retrying as needed * also includes task preKill hooks * clearDriverHandle() to destroy the task and associated resources * task exited hooks --- .changelog/17104.txt | 3 + client/allocrunner/alloc_runner.go | 7 +- client/allocrunner/alloc_runner_hooks.go | 4 + client/allocrunner/fail_hook.go | 118 ++++++++++++++++++ client/allocrunner/taskrunner/task_runner.go | 33 ++--- .../taskrunner/task_runner_test.go | 65 +++++++++- client/client_test.go | 95 +++++++++++++- client/config/config.go | 10 +- nomad/mock/alloc.go | 30 +++++ testutil/wait.go | 4 +- 10 files changed, 334 insertions(+), 35 deletions(-) create mode 100644 .changelog/17104.txt create mode 100644 client/allocrunner/fail_hook.go diff --git a/.changelog/17104.txt b/.changelog/17104.txt new file mode 100644 index 00000000000..8df9033cbd0 --- /dev/null +++ b/.changelog/17104.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: clean up resources upon failure to restore task during client restart +``` diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 112eb45b9a0..7545d46fe66 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/state" @@ -347,17 +348,15 @@ func (ar *allocRunner) Run() { ar.logger.Error("prerun failed", "error", err) for _, tr := range ar.tasks { - tr.MarkFailedDead(fmt.Sprintf("failed to setup alloc: %v", err)) + // emit event and mark task to be cleaned up during runTasks() + tr.MarkFailedKill(fmt.Sprintf("failed to setup alloc: %v", err)) } - - goto POST } } // Run the runners (blocks until they exit) ar.runTasks() -POST: if ar.isShuttingDown() { return } diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 097e6424a26..b03f72a6a24 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -8,6 +8,7 @@ import ( "time" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/taskenv" @@ -138,6 +139,9 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID), newChecksHook(hookLogger, alloc, ar.checkStore, ar), } + if config.ExtraAllocHooks != nil { + ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...) + } return nil } diff --git a/client/allocrunner/fail_hook.go b/client/allocrunner/fail_hook.go new file mode 100644 index 00000000000..abe5db3f85d --- /dev/null +++ b/client/allocrunner/fail_hook.go @@ -0,0 +1,118 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +// FailHook is designed to fail for testing purposes, +// so should never be included in a release. +//go:build !release + +package allocrunner + +import ( + "errors" + "fmt" + "os" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/hcl/v2/hclsimple" + + "github.com/hashicorp/nomad/client/allocrunner/interfaces" +) + +var ErrFailHookError = errors.New("failed successfully") + +func NewFailHook(l hclog.Logger, name string) *FailHook { + return &FailHook{ + name: name, + logger: l.Named(name), + } +} + +type FailHook struct { + name string + logger hclog.Logger + Fail struct { + Prerun bool `hcl:"prerun,optional"` + PreKill bool `hcl:"prekill,optional"` + Postrun bool `hcl:"postrun,optional"` + Destroy bool `hcl:"destroy,optional"` + Update bool `hcl:"update,optional"` + PreTaskRestart bool `hcl:"pretaskrestart,optional"` + Shutdown bool `hcl:"shutdown,optional"` + } +} + +func (h *FailHook) Name() string { + return h.name +} + +func (h *FailHook) LoadConfig(path string) *FailHook { + if _, err := os.Stat(path); os.IsNotExist(err) { + h.logger.Error("couldn't load config", "error", err) + return h + } + if err := hclsimple.DecodeFile(path, nil, &h.Fail); err != nil { + h.logger.Error("error parsing config", "path", path, "error", err) + } + return h +} + +var _ interfaces.RunnerPrerunHook = &FailHook{} + +func (h *FailHook) Prerun() error { + if h.Fail.Prerun { + return fmt.Errorf("prerun %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.RunnerPreKillHook = &FailHook{} + +func (h *FailHook) PreKill() { + if h.Fail.PreKill { + h.logger.Error("prekill", "error", ErrFailHookError) + } +} + +var _ interfaces.RunnerPostrunHook = &FailHook{} + +func (h *FailHook) Postrun() error { + if h.Fail.Postrun { + return fmt.Errorf("postrun %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.RunnerDestroyHook = &FailHook{} + +func (h *FailHook) Destroy() error { + if h.Fail.Destroy { + return fmt.Errorf("destroy %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.RunnerUpdateHook = &FailHook{} + +func (h *FailHook) Update(request *interfaces.RunnerUpdateRequest) error { + if h.Fail.Update { + return fmt.Errorf("update %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.RunnerTaskRestartHook = &FailHook{} + +func (h *FailHook) PreTaskRestart() error { + if h.Fail.PreTaskRestart { + return fmt.Errorf("destroy %w", ErrFailHookError) + } + return nil +} + +var _ interfaces.ShutdownHook = &FailHook{} + +func (h *FailHook) Shutdown() { + if h.Fail.Shutdown { + h.logger.Error("shutdown", "error", ErrFailHookError) + } +} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index aaf3c010abc..817a6d19db2 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -11,13 +11,13 @@ import ( "sync" "time" - "github.com/hashicorp/nomad/client/lib/cgutil" "golang.org/x/exp/slices" metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/hcl/v2/hcldec" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" @@ -27,6 +27,7 @@ import ( "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/dynamicplugins" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/serviceregistration" @@ -495,30 +496,20 @@ func (tr *TaskRunner) initLabels() { } } -// MarkFailedDead marks a task as failed and not to run. Aimed to be invoked -// when alloc runner prestart hooks failed. Should never be called with Run(). -func (tr *TaskRunner) MarkFailedDead(reason string) { - defer close(tr.waitCh) - - tr.stateLock.Lock() - if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil { - //TODO Nomad will be unable to restore this task; try to kill - // it now and fail? In general we prefer to leave running - // tasks running even if the agent encounters an error. - tr.logger.Warn("error persisting local failed task state; may be unable to restore after a Nomad restart", - "error", err) - } - tr.stateLock.Unlock() - +// MarkFailedKill marks a task as failed and should be killed. +// It should be invoked when alloc runner prestart hooks fail. +// Afterwards, Run() will perform any necessary cleanup. +func (tr *TaskRunner) MarkFailedKill(reason string) { + // Emit an event that fails the task and gives reasons for humans. event := structs.NewTaskEvent(structs.TaskSetupFailure). + SetKillReason(structs.TaskRestoreFailed). SetDisplayMessage(reason). SetFailsTask() - tr.UpdateState(structs.TaskStateDead, event) + tr.EmitEvent(event) - // Run the stop hooks in case task was a restored task that failed prestart - if err := tr.stop(); err != nil { - tr.logger.Error("stop failed while marking task dead", "error", err) - } + // Cancel kill context, so later when allocRunner runs tr.Run(), + // we'll follow the usual kill path and do all the appropriate cleanup steps. + tr.killCtxCancel() } // Run the TaskRunner. Starts the user's task or reattaches to a restored task. diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index cfaf60fe9dc..b25a261f3f9 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -16,6 +16,12 @@ import ( "time" "github.com/golang/snappy" + "github.com/kr/pretty" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" @@ -41,10 +47,6 @@ import ( "github.com/hashicorp/nomad/plugins/device" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" - "github.com/kr/pretty" - "github.com/shoenig/test/must" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type MockTaskStateUpdater struct { @@ -662,6 +664,61 @@ func TestTaskRunner_Restore_System(t *testing.T) { }) } +// TestTaskRunner_MarkFailedKill asserts that MarkFailedKill marks the task as failed +// and cancels the killCtx so a subsequent Run() will do any necessary task cleanup. +func TestTaskRunner_MarkFailedKill(t *testing.T) { + ci.Parallel(t) + + // set up some taskrunner + alloc := mock.MinAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + t.Cleanup(cleanup) + tr, err := NewTaskRunner(conf) + must.NoError(t, err) + + // side quest: set this lifecycle coordination channel, + // so early in tr MAIN, it doesn't randomly follow that route. + // test config creates this already closed, but not so in real life. + startCh := make(chan struct{}) + t.Cleanup(func() { close(startCh) }) + tr.startConditionMetCh = startCh + + // function under test: should mark the task as failed and cancel kill context + reason := "because i said so" + tr.MarkFailedKill(reason) + + // explicitly check kill context. + select { + case <-tr.killCtx.Done(): + default: + t.Fatal("kill context should be done") + } + + // Run() should now follow the kill path. + go tr.Run() + + select { // it should finish up very quickly + case <-tr.WaitCh(): + case <-time.After(time.Second): + t.Error("task not killed (or not as fast as expected)") + } + + // check state for expected values and events + state := tr.TaskState() + + // this gets set directly by MarkFailedKill() + test.True(t, state.Failed, test.Sprint("task should have failed")) + // this is set in Run() + test.Eq(t, structs.TaskStateDead, state.State, test.Sprint("task should be dead")) + // reason "because i said so" should be a task event message + foundMessages := make(map[string]bool) + for _, event := range state.Events { + foundMessages[event.DisplayMessage] = true + } + test.True(t, foundMessages[reason], test.Sprintf("expected '%s' in events: %#v", reason, foundMessages)) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/client_test.go b/client/client_test.go index 2c8af3e4e39..fb45586fbd2 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -14,7 +14,14 @@ import ( "time" memdb "github.com/hashicorp/go-memdb" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/allocrunner" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/fingerprint" @@ -33,8 +40,6 @@ import ( "github.com/hashicorp/nomad/plugins/device" psstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func testACLServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, *structs.ACLToken, func()) { @@ -1796,3 +1801,89 @@ func TestClient_ReconnectAllocs(t *testing.T) { require.False(t, invalid, "expected alloc to not be marked invalid") require.Equal(t, unknownAlloc.AllocModifyIndex, finalAlloc.AllocModifyIndex) } + +// TestClient_AllocPrerunErrorDuringRestore ensures that a running allocation, +// which fails Prerun during Restore on client restart, should be killed. +func TestClient_AllocPrerunErrorDuringRestore(t *testing.T) { + ci.Parallel(t) + + logger := testlog.HCLogger(t) + + // set up server + server, _, cleanS1 := testServer(t, nil) + t.Cleanup(cleanS1) + testutil.WaitForLeader(t, server.RPC) + + // set up first client, which will initially start the job cleanly + c1, cleanC1 := TestClient(t, func(c *config.Config) { + c.DevMode = false // so state persists to client 2 + c.RPCHandler = server + }) + t.Cleanup(func() { + test.NoError(t, cleanC1()) + }) + waitTilNodeReady(c1, t) + + // register a happy job to run until we cause it to fail + job := mock.MinJob() + testutil.RegisterJob(t, server.RPC, job) + + // wait for our alloc to be running + testutil.WaitForJobAllocStatus(t, server.RPC, job, map[string]int{ + structs.AllocClientStatusRunning: 1, + }) + t.Logf("job %s allocs running 👍", job.ID) + + // stop client 1, shutdown will dump state to disk but leave allocs running + must.NoError(t, c1.Shutdown()) + + // make a new client, using parts from the old one to be able to restore state + restoreClient := func() { + conf := c1.config.Copy() + // we want the prerun hook to fail + hook := allocrunner.NewFailHook(logger, t.Name()) + hook.Fail.Prerun = true + conf.ExtraAllocHooks = []interfaces.RunnerHook{hook} + + // this is so in-memory driver handles from client 1 can be restored by client 2 + conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, c1.config.PluginLoader) + + // actually make and start the client + c2, err := NewClient(conf, c1.consulCatalog, nil, c1.consulService, nil) + must.NoError(t, err) + t.Cleanup(func() { + test.NoError(t, c2.Shutdown()) + }) + } + restoreClient() + + // wait for the client to pick up the alloc and fail prerun hook + testutil.WaitForJobAllocStatus(t, server.RPC, job, map[string]int{ + structs.AllocClientStatusFailed: 1, + }) + t.Logf("job %s allocs failed 👍", job.ID) + + // ok, final assertions + allocs, err := server.State().AllocsByJob(nil, job.Namespace, job.ID, true) + must.NoError(t, err) + + ts := allocs[0].TaskStates["t"] + test.True(t, ts.Failed) + test.Eq(t, structs.TaskStateDead, ts.State) + + expectEvents := []string{ + // initial successful setup + structs.TaskReceived, + structs.TaskSetup, + structs.TaskStarted, + // after prerun error during restore + structs.TaskSetupFailure, + structs.TaskTerminated, // this whole test is to ensure this happens. + } + var actual []string + for _, event := range ts.Events { + actual = append(actual, event.Type) + } + must.Eq(t, expectEvents, actual) + test.StrContains(t, ts.Events[3].DisplayMessage, allocrunner.ErrFailHookError.Error()) +} diff --git a/client/config/config.go b/client/config/config.go index 0f4b3ea0a70..e46eda76d0d 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -14,13 +14,14 @@ import ( "time" "github.com/hashicorp/consul-template/config" - "github.com/hashicorp/nomad/client/lib/cgutil" - "github.com/hashicorp/nomad/command/agent/host" + log "github.com/hashicorp/go-hclog" "golang.org/x/exp/maps" "golang.org/x/exp/slices" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/helper/bufconndialer" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/pointer" @@ -319,6 +320,9 @@ type Config struct { // Drain configuration from the agent's config file. Drain *DrainConfig + + // ExtraAllocHooks are run with other allocation hooks, mainly for testing. + ExtraAllocHooks []interfaces.RunnerHook } type APIListenerRegistrar interface { diff --git a/nomad/mock/alloc.go b/nomad/mock/alloc.go index 95fafe6ea06..91f318520b2 100644 --- a/nomad/mock/alloc.go +++ b/nomad/mock/alloc.go @@ -86,6 +86,36 @@ func Alloc() *structs.Allocation { return alloc } +func MinAlloc() *structs.Allocation { + job := MinJob() + group := job.TaskGroups[0] + task := group.Tasks[0] + return &structs.Allocation{ + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: uuid.Generate(), + Job: job, + TaskGroup: group.Name, + ClientStatus: structs.AllocClientStatusPending, + DesiredStatus: structs.AllocDesiredStatusRun, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + task.Name: { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 100, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 256, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 150, + }, + }, + } +} + func AllocWithoutReservedPort() *structs.Allocation { alloc := Alloc() alloc.Resources.Networks[0].ReservedPorts = nil diff --git a/testutil/wait.go b/testutil/wait.go index 1436377982f..7072d629282 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -11,10 +11,11 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" "github.com/shoenig/test/must" "github.com/shoenig/test/wait" + + "github.com/hashicorp/nomad/nomad/structs" ) type testFn func() (bool, error) @@ -241,6 +242,7 @@ func WaitForVotingMembers(t testing.TB, rpc rpcFn, nPeers int) { // RegisterJobWithToken registers a job and uses the job's Region and Namespace. func RegisterJobWithToken(t testing.TB, rpc rpcFn, job *structs.Job, token string) { + t.Helper() WaitForResult(func() (bool, error) { args := &structs.JobRegisterRequest{} args.Job = job