Skip to content

Commit

Permalink
publisher: use ctrl.Publisher which publishes both the Task, StatusValue
Browse files Browse the repository at this point in the history
  • Loading branch information
joelrebel committed Aug 15, 2024
1 parent 71f012a commit 8bb69a6
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
52 changes: 39 additions & 13 deletions internal/model/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,56 @@ package model
import (
"context"

"github.com/metal-toolbox/rivets/events/controller"
"github.com/metal-toolbox/ctrl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var (
ErrPublishStatus = errors.New("error in publish Condition status")
ErrPublishTask = errors.New("error in publish Condition Task")
)

// Publisher defines methods to publish task information.
type Publisher interface {
Publish(ctx context.Context, task *Task)
Publish(ctx context.Context, task *Task) error
}

// StatusPublisher implements the Publisher interface
// to wrap the condition controller publish method
type StatusPublisher struct {
cp controller.ConditionStatusPublisher
logger *logrus.Entry
cp ctrl.Publisher
}

// TODO: this needs to publish the whole task to the KV
func NewNatsTaskStatusPublisher(cp controller.ConditionStatusPublisher) Publisher {
return &StatusPublisher{cp}
func NewTaskStatusPublisher(logger *logrus.Entry, cp ctrl.Publisher) Publisher {
return &StatusPublisher{
logger,
cp,
}
}

func (s *StatusPublisher) Publish(ctx context.Context, task *Task) {
s.cp.Publish(
ctx,
task.Asset.ID.String(),
task.State,
task.Status.MustMarshal(),
)
func (s *StatusPublisher) Publish(ctx context.Context, task *Task) error {
genericTask, err := CopyAsGenericTask(task)
if err != nil {
err = errors.Wrap(ErrPublishTask, err.Error())
s.logger.WithError(err).Warn("Task publish error")

return err
}

// overwrite credentials before this gets written back to the repository
genericTask.Server.BMCAddress = ""
genericTask.Server.BMCPassword = ""
genericTask.Server.BMCUser = ""

if err := s.cp.Publish(ctx, genericTask, false); err != nil {
err = errors.Wrap(ErrPublishStatus, err.Error())
s.logger.WithError(err).Error("Condition status publish error")

return err
}

s.logger.Trace("Condition Status publish successful")
return nil
}
3 changes: 2 additions & 1 deletion internal/outofband/action_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ func (h *handler) pollFirmwareTaskStatus(ctx context.Context) error {

if h.publisher != nil && status != "" {
h.task.Status.Update(h.task.Status.Last(), statusPrefix+" -- "+status)
h.publisher.Publish(ctx, h.task)
//nolint:errcheck // method called logs errors if any
_ = h.publisher.Publish(ctx, h.task)
}

// error check returns when maxPollStatusAttempts have been reached
Expand Down
2 changes: 1 addition & 1 deletion internal/worker/outofband.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (h *ConditionTaskHandler) Handle(ctx context.Context, condition *rctypes.Co
model.RunOutofband,
task,
h.store,
model.NewNatsTaskStatusPublisher(publisher),
model.NewTaskStatusPublisher(hLogger, statusPublisher),
hLogger,
)

Expand Down
3 changes: 2 additions & 1 deletion internal/worker/task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,5 +292,6 @@ func (t *handler) OnFailure(ctx context.Context, _ *model.Task) {
}

func (t *handler) Publish(ctx context.Context) {
t.Publisher.Publish(ctx, t.Task)
//nolint:errcheck // method called logs errors if any
_ = t.Publisher.Publish(ctx, t.Task)
}

0 comments on commit 8bb69a6

Please sign in to comment.