Skip to content

Commit

Permalink
full task cleanup when alloc prerun hook fails (#17104)
Browse files Browse the repository at this point in the history
to avoid leaking task resources (e.g. containers,
iptables) if allocRunner prerun fails during
restore on client restart.

now if prerun fails, TaskRunner.MarkFailedKill()
will only emit an event, mark the task as failed,
and cancel the tr's killCtx, so then ar.runTasks()
-> tr.Run() can take care of the actual cleanup.

removed from (formerly) tr.MarkFailedDead(),
now handled by tr.Run():
 * set task state as dead
 * save task runner local state
 * task stop hooks

also done in tr.Run() now that it's not skipped:
 * handleKill() to kill tasks while respecting
   their shutdown delay, and retrying as needed
   * also includes task preKill hooks
 * clearDriverHandle() to destroy the task
   and associated resources
 * task exited hooks
  • Loading branch information
gulducat authored May 8, 2023
1 parent 58a7d40 commit c2dc1c5
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 35 deletions.
3 changes: 3 additions & 0 deletions .changelog/17104.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: clean up resources upon failure to restore task during client restart
```
7 changes: 3 additions & 4 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/state"
Expand Down Expand Up @@ -347,17 +348,15 @@ func (ar *allocRunner) Run() {
ar.logger.Error("prerun failed", "error", err)

for _, tr := range ar.tasks {
tr.MarkFailedDead(fmt.Sprintf("failed to setup alloc: %v", err))
// emit event and mark task to be cleaned up during runTasks()
tr.MarkFailedKill(fmt.Sprintf("failed to setup alloc: %v", err))
}

goto POST
}
}

// Run the runners (blocks until they exit)
ar.runTasks()

POST:
if ar.isShuttingDown() {
return
}
Expand Down
4 changes: 4 additions & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/taskenv"
Expand Down Expand Up @@ -138,6 +139,9 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
newChecksHook(hookLogger, alloc, ar.checkStore, ar),
}
if config.ExtraAllocHooks != nil {
ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...)
}

return nil
}
Expand Down
118 changes: 118 additions & 0 deletions client/allocrunner/fail_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

// FailHook is designed to fail for testing purposes,
// so should never be included in a release.
//go:build !release

package allocrunner

import (
"errors"
"fmt"
"os"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/hcl/v2/hclsimple"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
)

var ErrFailHookError = errors.New("failed successfully")

func NewFailHook(l hclog.Logger, name string) *FailHook {
return &FailHook{
name: name,
logger: l.Named(name),
}
}

type FailHook struct {
name string
logger hclog.Logger
Fail struct {
Prerun bool `hcl:"prerun,optional"`
PreKill bool `hcl:"prekill,optional"`
Postrun bool `hcl:"postrun,optional"`
Destroy bool `hcl:"destroy,optional"`
Update bool `hcl:"update,optional"`
PreTaskRestart bool `hcl:"pretaskrestart,optional"`
Shutdown bool `hcl:"shutdown,optional"`
}
}

func (h *FailHook) Name() string {
return h.name
}

func (h *FailHook) LoadConfig(path string) *FailHook {
if _, err := os.Stat(path); os.IsNotExist(err) {
h.logger.Error("couldn't load config", "error", err)
return h
}
if err := hclsimple.DecodeFile(path, nil, &h.Fail); err != nil {
h.logger.Error("error parsing config", "path", path, "error", err)
}
return h
}

var _ interfaces.RunnerPrerunHook = &FailHook{}

func (h *FailHook) Prerun() error {
if h.Fail.Prerun {
return fmt.Errorf("prerun %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerPreKillHook = &FailHook{}

func (h *FailHook) PreKill() {
if h.Fail.PreKill {
h.logger.Error("prekill", "error", ErrFailHookError)
}
}

var _ interfaces.RunnerPostrunHook = &FailHook{}

func (h *FailHook) Postrun() error {
if h.Fail.Postrun {
return fmt.Errorf("postrun %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerDestroyHook = &FailHook{}

func (h *FailHook) Destroy() error {
if h.Fail.Destroy {
return fmt.Errorf("destroy %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerUpdateHook = &FailHook{}

func (h *FailHook) Update(request *interfaces.RunnerUpdateRequest) error {
if h.Fail.Update {
return fmt.Errorf("update %w", ErrFailHookError)
}
return nil
}

var _ interfaces.RunnerTaskRestartHook = &FailHook{}

func (h *FailHook) PreTaskRestart() error {
if h.Fail.PreTaskRestart {
return fmt.Errorf("destroy %w", ErrFailHookError)
}
return nil
}

var _ interfaces.ShutdownHook = &FailHook{}

func (h *FailHook) Shutdown() {
if h.Fail.Shutdown {
h.logger.Error("shutdown", "error", ErrFailHookError)
}
}
33 changes: 12 additions & 21 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"sync"
"time"

"github.com/hashicorp/nomad/client/lib/cgutil"
"golang.org/x/exp/slices"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/hcl/v2/hcldec"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
Expand All @@ -27,6 +27,7 @@ import (
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
Expand Down Expand Up @@ -495,30 +496,20 @@ func (tr *TaskRunner) initLabels() {
}
}

// MarkFailedDead marks a task as failed and not to run. Aimed to be invoked
// when alloc runner prestart hooks failed. Should never be called with Run().
func (tr *TaskRunner) MarkFailedDead(reason string) {
defer close(tr.waitCh)

tr.stateLock.Lock()
if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil {
//TODO Nomad will be unable to restore this task; try to kill
// it now and fail? In general we prefer to leave running
// tasks running even if the agent encounters an error.
tr.logger.Warn("error persisting local failed task state; may be unable to restore after a Nomad restart",
"error", err)
}
tr.stateLock.Unlock()

// MarkFailedKill marks a task as failed and should be killed.
// It should be invoked when alloc runner prestart hooks fail.
// Afterwards, Run() will perform any necessary cleanup.
func (tr *TaskRunner) MarkFailedKill(reason string) {
// Emit an event that fails the task and gives reasons for humans.
event := structs.NewTaskEvent(structs.TaskSetupFailure).
SetKillReason(structs.TaskRestoreFailed).
SetDisplayMessage(reason).
SetFailsTask()
tr.UpdateState(structs.TaskStateDead, event)
tr.EmitEvent(event)

// Run the stop hooks in case task was a restored task that failed prestart
if err := tr.stop(); err != nil {
tr.logger.Error("stop failed while marking task dead", "error", err)
}
// Cancel kill context, so later when allocRunner runs tr.Run(),
// we'll follow the usual kill path and do all the appropriate cleanup steps.
tr.killCtxCancel()
}

// Run the TaskRunner. Starts the user's task or reattaches to a restored task.
Expand Down
65 changes: 61 additions & 4 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
"time"

"github.com/golang/snappy"
"github.com/kr/pretty"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand All @@ -41,10 +47,6 @@ import (
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type MockTaskStateUpdater struct {
Expand Down Expand Up @@ -662,6 +664,61 @@ func TestTaskRunner_Restore_System(t *testing.T) {
})
}

// TestTaskRunner_MarkFailedKill asserts that MarkFailedKill marks the task as failed
// and cancels the killCtx so a subsequent Run() will do any necessary task cleanup.
func TestTaskRunner_MarkFailedKill(t *testing.T) {
ci.Parallel(t)

// set up some taskrunner
alloc := mock.MinAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
t.Cleanup(cleanup)
tr, err := NewTaskRunner(conf)
must.NoError(t, err)

// side quest: set this lifecycle coordination channel,
// so early in tr MAIN, it doesn't randomly follow that route.
// test config creates this already closed, but not so in real life.
startCh := make(chan struct{})
t.Cleanup(func() { close(startCh) })
tr.startConditionMetCh = startCh

// function under test: should mark the task as failed and cancel kill context
reason := "because i said so"
tr.MarkFailedKill(reason)

// explicitly check kill context.
select {
case <-tr.killCtx.Done():
default:
t.Fatal("kill context should be done")
}

// Run() should now follow the kill path.
go tr.Run()

select { // it should finish up very quickly
case <-tr.WaitCh():
case <-time.After(time.Second):
t.Error("task not killed (or not as fast as expected)")
}

// check state for expected values and events
state := tr.TaskState()

// this gets set directly by MarkFailedKill()
test.True(t, state.Failed, test.Sprint("task should have failed"))
// this is set in Run()
test.Eq(t, structs.TaskStateDead, state.State, test.Sprint("task should be dead"))
// reason "because i said so" should be a task event message
foundMessages := make(map[string]bool)
for _, event := range state.Events {
foundMessages[event.DisplayMessage] = true
}
test.True(t, foundMessages[reason], test.Sprintf("expected '%s' in events: %#v", reason, foundMessages))
}

// TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are
// interpolated.
func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) {
Expand Down
Loading

0 comments on commit c2dc1c5

Please sign in to comment.