Skip to content

Commit

Permalink
Task.Status to hold records of status updates
Browse files Browse the repository at this point in the history
This updates the Task.Status attribute to hold records of status
updates, which looks like this,

```
  "status": {
    "records": [
      {
        "msg": "initialized task",
        "ts": "2023-10-20T10:34:28.845922256Z"
      },
      {
        "msg": "connecting to device BMC",
        "ts": "2023-10-20T10:34:28.934251833Z"
      },
      {
        "msg": "collecting inventory from device BMC",
        "ts": "2023-10-20T10:34:31.862766437Z"
      },
      {
        "msg": "[bmc] component firmware version equal",
        "ts": "2023-10-20T10:35:09.731592903Z"
      },
      {
        "msg": "[bios] component firmware version equal",
        "ts": "2023-10-20T10:35:09.731647244Z"
      },
      {
        "msg": "task completed successfully",
        "ts": "2023-10-20T10:35:09.740624084Z"
      }
    ]
  },
```
  • Loading branch information
joelrebel committed Oct 23, 2023
1 parent 7e116de commit 61f522b
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 75 deletions.
40 changes: 39 additions & 1 deletion internal/model/task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package model

import (
"encoding/json"
"time"

sw "github.com/filanov/stateswitch"
Expand Down Expand Up @@ -116,7 +117,7 @@ type Task struct {
state rctypes.State

// status holds informational data on the state
Status string
Status StatusRecord

// Flasher determines the firmware to be installed for each component based on the firmware plan method.
FirmwarePlanMethod FirmwarePlanMethod
Expand Down Expand Up @@ -147,3 +148,40 @@ func (t *Task) SetState(state sw.State) error {
func (t *Task) State() sw.State {
return sw.State(t.state)
}

func NewTaskStatusRecord(s string) StatusRecord {
sr := StatusRecord{}
sr.Append(s)

return sr
}

type StatusRecord struct {
StatusMsgs []StatusMsg `json:"records"`
}

type StatusMsg struct {
Timestamp time.Time `json:"ts,omitempty"`
Msg string `json:"msg,omitempty"`
}

func (sr *StatusRecord) Append(s string) {
for _, r := range sr.StatusMsgs {
if r.Msg == s {
return
}
}

n := StatusMsg{Timestamp: time.Now(), Msg: s}

sr.StatusMsgs = append(sr.StatusMsgs, n)
}

func (sr *StatusRecord) String() string {
b, err := json.Marshal(sr)
if err != nil {
panic(err)
}

return string(b)
}
2 changes: 1 addition & 1 deletion internal/outofband/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

func newTaskFixture(status string) *model.Task {
task := &model.Task{}
task.Status = string(status)
task.Status.Append(status)

// task.Parameters.Device =
return task
Expand Down
14 changes: 7 additions & 7 deletions internal/statemachine/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ func (a *ActionStateMachine) Run(ctx context.Context, action *model.Action, tctx
"step": transitionType,
}).Info("running action step")

tctx.Task.Status = fmt.Sprintf(
tctx.Task.Status.Append(fmt.Sprintf(
"component: %s, install version: %s, running step %s",
action.Firmware.Component,
action.Firmware.Version,
string(transitionType),
)
))

tctx.Publisher.Publish(tctx)

Expand All @@ -202,12 +202,12 @@ func (a *ActionStateMachine) Run(ctx context.Context, action *model.Action, tctx
a.transitionsCompleted = append(a.transitionsCompleted, transitionType)
a.registerTransitionMetrics(startTS, action, string(transitionType), "succeeded")

tctx.Task.Status = fmt.Sprintf(
tctx.Task.Status.Append(fmt.Sprintf(
"component: %s, install version: %s, completed step %s",
action.Firmware.Component,
action.Firmware.Version,
string(transitionType),
)
))

tctx.Publisher.Publish(tctx)

Expand Down Expand Up @@ -252,11 +252,11 @@ func (a *ActionStateMachine) Run(ctx context.Context, action *model.Action, tctx
)
}

tctx.Task.Status = fmt.Sprintf(
"component: %s, completed firmware install, version: %s",
tctx.Task.Status.Append(fmt.Sprintf(
"[%s] component, completed firmware install, version: %s",
action.Firmware.Component,
action.Firmware.Version,
)
))

tctx.Publisher.Publish(tctx)

Expand Down
2 changes: 1 addition & 1 deletion internal/statemachine/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (m *TaskStateMachine) Run(task *model.Task, tctx *HandlerContext) error {
// task failure handler
taskFailed := func(err error) error {
// include error in task
task.Status = err.Error()
task.Status.Append(err.Error())

// run transition failed handler
if txErr := m.TransitionFailed(task, tctx); txErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/statemachine/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newTaskFixture(t *testing.T, state string, fault *rctypes.Fault) *model.Tas

func Test_NewTaskStateMachine(t *testing.T) {
task := &model.Task{}
task.Status = string(model.StatePending)
task.Status.Append(string(model.StatePending))

tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion internal/worker/kv_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func statusFromContext(hCtx *sm.HandlerContext) []byte {
TraceID: trace.SpanFromContext(hCtx.Ctx).SpanContext().TraceID().String(),
SpanID: trace.SpanFromContext(hCtx.Ctx).SpanContext().SpanID().String(),
State: string(hCtx.Task.State()),
Status: statusInfoJSON(hCtx.Task.Status),
Status: json.RawMessage(hCtx.Task.Status.String()),
// ResourceVersion: XXX: the handler context has no concept of this! does this make
// sense at the controller-level?
UpdatedAt: time.Now(),
Expand Down
2 changes: 1 addition & 1 deletion internal/worker/kv_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestPublisher(t *testing.T) {
Ctx: context.TODO(),
Task: &model.Task{
ID: taskID,
Status: "some-status",
Status: model.NewTaskStatusRecord("some-status"),
},
WorkerID: registry.GetID("kvtest"),
Asset: &model.Asset{
Expand Down
73 changes: 25 additions & 48 deletions internal/worker/task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,6 @@ var (
errTaskPlanActions = errors.New("error in task action planning")
)

// installSkippedReasons is map of component names to a reason for why a firmware version was removed from the install list.
type installSkippedReasons map[string]string

func (f installSkippedReasons) String() string {
s := ""

for component, cause := range f {
s += fmt.Sprintf("[%s] %s; ", component, cause)
}

return s
}

// taskHandler implements the taskTransitionHandler methods
type taskHandler struct{}

Expand Down Expand Up @@ -181,6 +168,8 @@ func (h *taskHandler) TaskFailed(_ sw.StateSwitch, args sw.TransitionArgs) error
return sm.ErrInvalidTransitionHandler
}

tctx.Task.Status.Append("task failed")

if tctx.DeviceQueryor != nil {
if err := tctx.DeviceQueryor.Close(tctx.Ctx); err != nil {
tctx.Logger.WithFields(logrus.Fields{"err": err.Error()}).Warn("device logout error")
Expand All @@ -190,32 +179,13 @@ func (h *taskHandler) TaskFailed(_ sw.StateSwitch, args sw.TransitionArgs) error
return nil
}

func (h *taskHandler) TaskSuccessful(t sw.StateSwitch, args sw.TransitionArgs) error {
func (h *taskHandler) TaskSuccessful(_ sw.StateSwitch, args sw.TransitionArgs) error {
tctx, ok := args.(*sm.HandlerContext)
if !ok {
return sm.ErrInvalidTransitionHandler
}

task, ok := t.(*model.Task)
if !ok {
return errors.Wrap(ErrSaveTask, ErrTaskTypeAssertion.Error())
}

// summarize task status
statuses := []string{}
for _, actionSM := range tctx.ActionStateMachines {
action := task.ActionsPlanned.ByID(actionSM.ActionID())
s := fmt.Sprintf(
"[%s] install firmware: %s, state: %s",
action.Firmware.Component,
action.Firmware.Version,
action.State(),
)

statuses = append(statuses, s)
}

tctx.Task.Status = strings.Join(statuses, "; ")
tctx.Task.Status.Append("task completed successfully")

if tctx.DeviceQueryor != nil {
if err := tctx.DeviceQueryor.Close(tctx.Ctx); err != nil {
Expand Down Expand Up @@ -269,14 +239,14 @@ func (h *taskHandler) queryFromDevice(tctx *sm.HandlerContext) (model.Components
tctx.DeviceQueryor = outofband.NewDeviceQueryor(tctx.Ctx, tctx.Asset, tctx.Logger)
}

tctx.Task.Status = "connecting to device BMC"
tctx.Task.Status.Append("connecting to device BMC")
tctx.Publisher.Publish(tctx)

if err := tctx.DeviceQueryor.Open(tctx.Ctx); err != nil {
return nil, err
}

tctx.Task.Status = "collecting inventory from device BMC"
tctx.Task.Status.Append("collecting inventory from device BMC")
tctx.Publisher.Publish(tctx)

deviceCommon, err := tctx.DeviceQueryor.Inventory(tctx.Ctx)
Expand Down Expand Up @@ -311,16 +281,14 @@ func (h *taskHandler) planInstall(hCtx *sm.HandlerContext, task *model.Task, fir
}).Debug("checking against current inventory")

toInstall := firmwares
skippedReasons := installSkippedReasons{}

if !task.Parameters.ForceInstall {
toInstall, skippedReasons = removeFirmwareAlreadyAtDesiredVersion(hCtx, firmwares)
toInstall = removeFirmwareAlreadyAtDesiredVersion(hCtx, firmwares)
}

if len(toInstall) == 0 {
info := "no action required for this task"
info := "no actions required for this task"

task.Status = fmt.Sprintf("%s: %s", info, skippedReasons.String())
hCtx.Publisher.Publish(hCtx)
hCtx.Logger.Info(info)

Expand Down Expand Up @@ -387,17 +355,22 @@ func sortFirmwareByInstallOrder(firmwares []*model.Firmware) {
}

// returns a list of firmware applicable and a list of causes for firmwares that were removed from the install list.
func removeFirmwareAlreadyAtDesiredVersion(hCtx *sm.HandlerContext, fws []*model.Firmware) ([]*model.Firmware, installSkippedReasons) {
func removeFirmwareAlreadyAtDesiredVersion(hCtx *sm.HandlerContext, fws []*model.Firmware) []*model.Firmware {
var toInstall []*model.Firmware

causes := installSkippedReasons{}

// only iterate the inventory once
invMap := make(map[string]string)
for _, cmp := range hCtx.Asset.Components {
invMap[strings.ToLower(cmp.Slug)] = cmp.FirmwareInstalled
}

fmtCause := func(component, cause, currentV, requestedV string) string {
if currentV != "" && requestedV != "" {
return fmt.Sprintf("[%s] %s, current=%s, requested=%s", component, cause, currentV, requestedV)
}

return fmt.Sprintf("[%s] %s", component, cause)
}

// XXX: this will drop firmware for components that are specified in
// the firmware set but not in the inventory. This is consistent with the
// desire of users to not require a force or a re-run to accomplish an
Expand All @@ -412,16 +385,16 @@ func removeFirmwareAlreadyAtDesiredVersion(hCtx *sm.HandlerContext, fws []*model
"component": fw.Component,
}).Warn(cause)

causes[fw.Component] = cause
hCtx.Task.Status.Append(fmtCause(fw.Component, cause, "", ""))

case strings.EqualFold(currentVersion, fw.Version):
cause := "component inventory firmware version matches set"
cause := "component firmware version equal"
hCtx.Logger.WithFields(logrus.Fields{
"component": fw.Component,
"version": fw.Version,
}).Debug(cause)

causes[fw.Component] = cause
hCtx.Task.Status.Append(fmtCause(fw.Component, cause, currentVersion, fw.Version))

default:
hCtx.Logger.WithFields(logrus.Fields{
Expand All @@ -431,8 +404,12 @@ func removeFirmwareAlreadyAtDesiredVersion(hCtx *sm.HandlerContext, fws []*model
}).Debug("firmware queued for install")

toInstall = append(toInstall, fw)

hCtx.Task.Status.Append(
fmtCause(fw.Component, "firmware queued for install", currentVersion, fw.Version),
)
}
}

return toInstall, causes
return toInstall
}
4 changes: 2 additions & 2 deletions internal/worker/task_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func TestRemoveFirmwareAlreadyAtDesiredVersion(t *testing.T) {
},
}

got, reasons := removeFirmwareAlreadyAtDesiredVersion(ctx, fwSet)
require.Equal(t, 2, len(reasons))
got := removeFirmwareAlreadyAtDesiredVersion(ctx, fwSet)
require.Equal(t, 2, ctx.Task.Status)
require.Equal(t, 1, len(got))
require.Equal(t, expected[0], got[0])
}
Expand Down
17 changes: 5 additions & 12 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package worker
import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"sync"
Expand Down Expand Up @@ -215,16 +214,14 @@ func (o *Worker) eventNak(event events.Message) {

func newTask(conditionID uuid.UUID, params *rctypes.FirmwareInstallTaskParameters) (model.Task, error) {
task := model.Task{
ID: conditionID,
ID: conditionID,
Parameters: *params,
Status: model.NewTaskStatusRecord("initialized task"),
}

//nolint:errcheck // this method returns nil unconditionally
task.SetState(model.StatePending)

task.Parameters.AssetID = params.AssetID
task.Parameters.ForceInstall = params.ForceInstall
task.Parameters.ResetBMCBeforeInstall = params.ResetBMCBeforeInstall
task.Parameters.DryRun = params.DryRun

if len(params.Firmwares) > 0 {
task.Parameters.Firmwares = params.Firmwares
task.FirmwarePlanMethod = model.FromRequestedFirmware
Expand Down Expand Up @@ -521,10 +518,6 @@ type statusEmitter struct {
logger *logrus.Logger
}

func statusInfoJSON(s string) json.RawMessage {
return []byte(fmt.Sprintf("{%q: %q}", "msg", s))
}

func (e *statusEmitter) Publish(hCtx *sm.HandlerContext) {
ctx, span := otel.Tracer(pkgName).Start(
hCtx.Ctx,
Expand All @@ -540,7 +533,7 @@ func (e *statusEmitter) Publish(hCtx *sm.HandlerContext) {
ConditionID: task.ID,
ServerID: task.Parameters.AssetID,
State: rctypes.State(task.State()),
Status: statusInfoJSON(task.Status),
Status: json.RawMessage(task.Status.String()),
},
}

Expand Down

0 comments on commit 61f522b

Please sign in to comment.