Skip to content

Commit

Permalink
model/task: use Task type from rivets
Browse files Browse the repository at this point in the history
The rivets.Task type is to be used by all controllers to execute work
recieved as conditions
  • Loading branch information
joelrebel committed Aug 15, 2024
1 parent 6e4ac27 commit 4304982
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 183 deletions.
244 changes: 159 additions & 85 deletions internal/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package model

import (
"encoding/json"
"time"
"reflect"

"github.com/google/uuid"
"github.com/mitchellh/copystructure"
"github.com/pkg/errors"

rctypes "github.com/metal-toolbox/rivets/condition"
rtypes "github.com/metal-toolbox/rivets/types"
)

// InstallMethod is one of 'outofband' OR 'inband'
Expand Down Expand Up @@ -40,155 +42,227 @@ const (
StateSucceeded = rctypes.Succeeded
StateFailed = rctypes.Failed

TaskVersion = "1.0"
TaskDataStructVersion = "1.0"
)

var (
errTaskFirmwareParam = errors.New("firmware task parameters error")
)

// Task is a top level unit of work handled by flasher.
//
// A task performs one or more actions, each of the action corresponds to a Firmware planned for install.
//
// nolint:govet // fieldalignment - struct is better readable in its current form.
type Task struct {
// StructVersion indicates the Task object version and is used to determine Task compatibility.
StructVersion string `json:"task_version"`
// Alias parameterized model.Task
type Task rctypes.Task[*rctypes.FirmwareInstallTaskParameters, *TaskData]

// Task unique identifier, this is set to the Condition identifier.
ID uuid.UUID `json:"id"`
func (t *Task) SetState(s rctypes.State) {
t.State = s
}

// state is the state of the install
State rctypes.State `json:"state"`
func (t *Task) MustMarshal() json.RawMessage {
b, err := json.Marshal(t)
if err != nil {
panic(err)
}

// status holds informational data on the state
Status StatusRecord `json:"status"`
return b
}

type TaskData struct {
StructVersion string `json:"struct_version"`
// Flasher determines the firmware to be installed for each component based on the firmware plan method.
FirmwarePlanMethod FirmwarePlanMethod `json:"firmware_plan_method,omitempty"`

// ActionsPlanned to be executed for each firmware to be installed.
ActionsPlanned Actions `json:"actions_planned,omitempty"`

// Parameters for this task
Parameters rctypes.FirmwareInstallTaskParameters `json:"parameters,omitempty"`

// Fault is a field to inject failures into a flasher task execution,
// this is set from the Condition only when the worker is run with fault-injection enabled.
Fault *rctypes.Fault `json:"fault,omitempty"`

// FacilityCode identifies the facility this task is to be executed in.
FacilityCode string `json:"facility_code"`

// Data is an arbitrary key values map available to all task, action handler methods.
Data map[string]string `json:"data,omitempty"`
// Scratch is an arbitrary key values map available to all task, action handler methods.
Scratch map[string]string `json:"data,omitempty"`
}

// Asset holds attributes about the device under firmware install.
Asset *Asset `json:"asset,omitempty"`
func (td *TaskData) MapStringInterfaceToStruct(m map[string]interface{}) error {
jsonData, err := json.Marshal(m)
if err != nil {
return err
}

// WorkerID is the identifier for the worker executing this task.
WorkerID string `json:"worker_id,omitempty"`
return json.Unmarshal(jsonData, td)
}

CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
CompletedAt time.Time `json:"completed_at,omitempty"`
func (td *TaskData) Marshal() (json.RawMessage, error) {
return json.Marshal(td)
}

// SetState implements the Task runner interface
func (t *Task) SetState(state rctypes.State) error {
t.State = state
return nil
func (td *TaskData) Unmarshal(r json.RawMessage) error {
return json.Unmarshal(r, td)
}

func NewTask(conditionID uuid.UUID, params *rctypes.FirmwareInstallTaskParameters) (Task, error) {
func NewTask(conditionID uuid.UUID, kind rctypes.Kind, params *rctypes.FirmwareInstallTaskParameters) (Task, error) {
t := Task{
StructVersion: TaskVersion,
StructVersion: rctypes.TaskVersion1,
ID: conditionID,
Status: NewTaskStatusRecord("initialized task"),
Kind: kind,
Data: &TaskData{},
Status: rctypes.NewTaskStatusRecord("initialized task"),
State: StatePending,
Parameters: *params,
Data: make(map[string]string),
Parameters: params,
}

t.Data.Scratch = make(map[string]string)
if len(params.Firmwares) > 0 {
t.Parameters.Firmwares = params.Firmwares
t.FirmwarePlanMethod = FromRequestedFirmware
t.Data.FirmwarePlanMethod = FromRequestedFirmware

return t, nil
}

if params.FirmwareSetID != uuid.Nil {
t.Parameters.FirmwareSetID = params.FirmwareSetID
t.FirmwarePlanMethod = FromFirmwareSet
t.Data.FirmwarePlanMethod = FromFirmwareSet

return t, nil
}

return t, errors.Wrap(errTaskFirmwareParam, "no firmware list or firmwareSetID specified")
}

func NewTaskStatusRecord(s string) StatusRecord {
sr := StatusRecord{}
if s == "" {
return sr
}
func convTaskParams(params any) (*rctypes.FirmwareInstallTaskParameters, error) {
errParamsConv := errors.New("error in Task.Parameters conversion")

sr.Append(s)
fwInstallParams := &rctypes.FirmwareInstallTaskParameters{}
switch v := params.(type) {
// When unpacked from a http request by the condition orc client,
// Parameters are of this type.
case map[string]interface{}:
if err := fwInstallParams.MapStringInterfaceToStruct(v); err != nil {
return nil, errors.Wrap(errParamsConv, err.Error())
}
// When received over NATS its of this type.
case json.RawMessage:
if err := fwInstallParams.Unmarshal(v); err != nil {
return nil, errors.Wrap(errParamsConv, err.Error())
}
default:
msg := "Task.Parameters expected to be one of map[string]interface{} or json.RawMessage, current type: " + reflect.TypeOf(params).String()
return nil, errors.Wrap(errParamsConv, msg)
}

return sr
return fwInstallParams, nil
}

type StatusRecord struct {
StatusMsgs []StatusMsg `json:"records"`
}
func convTaskData(data any) (*TaskData, error) {
errDataConv := errors.New("error in Task.Data conversion")

type StatusMsg struct {
Timestamp time.Time `json:"ts,omitempty"`
Msg string `json:"msg,omitempty"`
taskData := &TaskData{}
switch v := data.(type) {
// When unpacked from a http request by the condition orc client,
// Parameters are of this type.
case map[string]interface{}:
if err := taskData.MapStringInterfaceToStruct(v); err != nil {
return nil, errors.Wrap(errDataConv, err.Error())
}
// When received over NATS its of this type.
case json.RawMessage:
if err := taskData.Unmarshal(v); err != nil {
return nil, errors.Wrap(errDataConv, err.Error())
}
default:
msg := "Task.Data expected to be one of map[string]interface{} or json.RawMessage, current type: " + reflect.TypeOf(data).String()
return nil, errors.Wrap(errDataConv, msg)
}

return taskData, nil
}

func (sr *StatusRecord) Append(s string) {
if s == "" {
return
func CopyAsFwInstallTask(task *rctypes.Task[any, any]) (*Task, error) {
errTaskConv := errors.New("error in generic Task conversion")

params, err := convTaskParams(task.Parameters)
if err != nil {
return nil, errors.Wrap(errTaskConv, err.Error())
}

for _, r := range sr.StatusMsgs {
if r.Msg == s {
return
}
data, err := convTaskData(task.Data)
if err != nil {
return nil, errors.Wrap(errTaskConv, err.Error())
}

if len(sr.StatusMsgs) > 4 {
sr.StatusMsgs = sr.StatusMsgs[1:]
// deep copy fields referenced by pointer
asset, err := copystructure.Copy(task.Server)
if err != nil {
return nil, errors.Wrap(errTaskConv, err.Error()+": Task.Server")
}

n := StatusMsg{Timestamp: time.Now(), Msg: s}
fault, err := copystructure.Copy(task.Fault)
if err != nil {
return nil, errors.Wrap(errTaskConv, err.Error()+": Task.Fault")
}

sr.StatusMsgs = append(sr.StatusMsgs, n)
}
if len(params.Firmwares) > 0 {
data.FirmwarePlanMethod = FromRequestedFirmware
}

func (sr *StatusRecord) Last() string {
if len(sr.StatusMsgs) == 0 {
return ""
if params.FirmwareSetID != uuid.Nil {
data.FirmwarePlanMethod = FromFirmwareSet
}

return sr.StatusMsgs[len(sr.StatusMsgs)-1].Msg
return &Task{
StructVersion: task.StructVersion,
ID: task.ID,
Kind: task.Kind,
State: task.State,
Status: task.Status,
Data: data,
Parameters: params,
Fault: fault.(*rctypes.Fault),
FacilityCode: task.FacilityCode,
Server: asset.(*rtypes.Server),
WorkerID: task.WorkerID,
TraceID: task.TraceID,
SpanID: task.SpanID,
CreatedAt: task.CreatedAt,
UpdatedAt: task.UpdatedAt,
CompletedAt: task.CompletedAt,
}, nil
}

func (sr *StatusRecord) Update(currentMsg, newMsg string) {
for idx, r := range sr.StatusMsgs {
if r.Msg == currentMsg {
sr.StatusMsgs[idx].Msg = newMsg
}
func CopyAsGenericTask(task *Task) (*rctypes.Task[any, any], error) {
errTaskConv := errors.New("error in firmware install Task conversion")

paramsJSON, err := task.Parameters.Marshal()
if err != nil {
return nil, errors.Wrap(errTaskConv, err.Error()+": Task.Parameters")
}
}

func (sr *StatusRecord) MustMarshal() json.RawMessage {
b, err := json.Marshal(sr)
dataJSON, err := task.Data.Marshal()
if err != nil {
panic(err)
return nil, errors.Wrap(errTaskConv, err.Error()+": Task.Data")
}

return b
// deep copy fields referenced by pointer
asset, err := copystructure.Copy(task.Server)
if err != nil {
return nil, errors.Wrap(errTaskConv, err.Error()+": Task.Server")
}

fault, err := copystructure.Copy(task.Fault)
if err != nil {
return nil, errors.Wrap(errTaskConv, err.Error()+": Task.Fault")
}

return &rctypes.Task[any, any]{
StructVersion: task.StructVersion,
ID: task.ID,
Kind: task.Kind,
State: task.State,
Status: task.Status,
Data: dataJSON,
Parameters: paramsJSON,
Fault: fault.(*rctypes.Fault),
FacilityCode: task.FacilityCode,
Server: asset.(*rtypes.Server),
WorkerID: task.WorkerID,
TraceID: task.TraceID,
SpanID: task.SpanID,
CreatedAt: task.CreatedAt,
UpdatedAt: task.UpdatedAt,
CompletedAt: task.CompletedAt,
}, nil
}
Loading

0 comments on commit 4304982

Please sign in to comment.