Skip to content

Commit

Permalink
add a KV status publisher for Flasher (#21)
Browse files Browse the repository at this point in the history
* add a KV status publisher for Flasher

* implement review comments
  • Loading branch information
DoctorVin authored Jun 2, 2023
1 parent 54c65e5 commit 0c35a03
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ run:
# build-tags:
skip-dirs:
- internal/fixtures
skip-files:
- "(.*/)*.*_test.go"

issues:
exclude-rules:
Expand Down
3 changes: 3 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var cmdRun = &cobra.Command{

// run worker command
var (
useStatusKV bool
dryrun bool
faultInjection bool
storeKind string
Expand Down Expand Up @@ -76,6 +77,7 @@ func runWorker(ctx context.Context) {
w := worker.New(
flasher.Config.FacilityCode,
dryrun,
useStatusKV,
faultInjection,
flasher.Config.Concurrency,
stream,
Expand All @@ -101,6 +103,7 @@ func initInventory(ctx context.Context, config *app.Configuration, logger *logru
func init() {
cmdRun.PersistentFlags().StringVar(&storeKind, "store", "", "inventory store to lookup devices for update - 'serverservice' or an inventory file with a .yml/.yaml extenstion")
cmdRun.PersistentFlags().BoolVarP(&dryrun, "dry-run", "", false, "In dryrun mode, the worker actions the task without installing firmware")
cmdRun.PersistentFlags().BoolVarP(&useStatusKV, "use-kv", "", false, "when this is true, flasher writes status to a NATS KV store instead of sending reply messages")
cmdRun.PersistentFlags().BoolVarP(&faultInjection, "fault-injection", "", false, "Tasks can include a Fault attribute to allow fault injection for development purposes")

if err := cmdRun.MarkPersistentFlagRequired("store"); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/jpillora/backoff v1.0.0
github.com/metal-toolbox/conditionorc v0.0.0-20230601150920-843e71fd74f8
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/nats-server/v2 v2.9.15
github.com/nats-io/nats.go v1.25.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.15.1
Expand Down Expand Up @@ -79,13 +80,15 @@ require (
github.com/jacobweinstock/iamt v0.0.0-20230304043040-a6b4a1001123 // indirect
github.com/jmoiron/sqlx v1.3.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.4.0 // indirect
Expand Down Expand Up @@ -134,6 +137,7 @@ require (
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.119.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,7 @@ github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
Expand Down Expand Up @@ -1591,6 +1592,7 @@ github.com/nats-io/jwt/v2 v2.4.0 h1:1woVcq37qhNwJOeZ4ZoRy5NJU5bvbtGsIammf2GpuJQ=
github.com/nats-io/jwt/v2 v2.4.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.5.0/go.mod h1:Kj86UtrXAL6LwYRA6H4RqzkHhK0Vcv2ZnKD5WbQ1t3g=
github.com/nats-io/nats-server/v2 v2.9.15 h1:MuwEJheIwpvFgqvbs20W8Ish2azcygjf4Z0liVu2I4c=
github.com/nats-io/nats-server/v2 v2.9.15/go.mod h1:QlCTy115fqpx4KSOPFIxSV7DdI6OxtZsGOL1JLdeRlE=
github.com/nats-io/nats.go v1.12.1/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
Expand Down
2 changes: 1 addition & 1 deletion internal/outofband/action_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (h *actionHandler) PublishStatus(_ sw.StateSwitch, args sw.TransitionArgs)
return sm.ErrInvalidTransitionHandler
}

tctx.Publisher.Publish(tctx, tctx.Task)
tctx.Publisher.Publish(tctx)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/outofband/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newTaskFixture(status string) *model.Task {
// eventEmitter implements the statemachine.Publisher interface
type eventEmitter struct{}

func (e *eventEmitter) Publish(_ *sm.HandlerContext, _ *model.Task) {}
func (e *eventEmitter) Publish(_ *sm.HandlerContext) {}

func newtaskHandlerContextFixture(task *model.Task, asset *model.Asset) *sm.HandlerContext {
repository, _ := store.NewMockInventory()
Expand Down
4 changes: 2 additions & 2 deletions internal/statemachine/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (a *ActionStateMachine) Run(ctx context.Context, action *model.Action, tctx
string(transitionType),
)

tctx.Publisher.Publish(tctx, tctx.Task)
tctx.Publisher.Publish(tctx)

// return on context cancellation
if ctx.Err() != nil {
Expand Down Expand Up @@ -202,7 +202,7 @@ func (a *ActionStateMachine) Run(ctx context.Context, action *model.Action, tctx
string(transitionType),
)

tctx.Publisher.Publish(tctx, tctx.Task)
tctx.Publisher.Publish(tctx)
}

// run transition success handler
Expand Down
8 changes: 6 additions & 2 deletions internal/statemachine/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/metal-toolbox/flasher/internal/store"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.hollow.sh/toolbox/events/registry"
)

const (
Expand All @@ -34,7 +35,7 @@ var (

// Publisher defines methods to publish task information.
type Publisher interface {
Publish(ctx *HandlerContext, task *model.Task)
Publish(ctx *HandlerContext)
}

// HandlerContext holds references to objects required to complete firmware install task and action transitions.
Expand Down Expand Up @@ -74,7 +75,7 @@ type HandlerContext struct {
Logger *logrus.Entry

// WorkerID is the identifier for the worker executing this task.
WorkerID string
WorkerID registry.ControllerID

// ActionStateMachines are sub-statemachines of this Task
// each firmware applicable has a Action statemachine that is
Expand All @@ -87,6 +88,9 @@ type HandlerContext struct {
//
// It is upto the Action handler implementations to ensure the dry run works as described.
Dryrun bool

// LastRev is the last revision of the status data for this task stored in NATS KV
LastRev uint64
}

// TaskTransitioner defines stateswitch methods that handle state transitions.
Expand Down
103 changes: 103 additions & 0 deletions internal/worker/kv_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//nolint:gomnd //useless opinions
package worker

import (
"encoding/json"
"fmt"
"time"

sm "github.com/metal-toolbox/flasher/internal/statemachine"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"

"go.hollow.sh/toolbox/events"
"go.hollow.sh/toolbox/events/pkg/kv"
)

var (
statusKVName = "flasher-status"
defaultKVOpts = []kv.Option{
kv.WithReplicas(3),
kv.WithDescription("flasher condition status tracking"),
kv.WithTTL(10 * 24 * time.Hour),
}
)

type statusKVPublisher struct {
kv nats.KeyValue
log *logrus.Logger
}

// Publish implements the statemachine Publisher interface.
func (s *statusKVPublisher) Publish(hCtx *sm.HandlerContext) {
key := fmt.Sprintf("%s/%s", hCtx.Asset.FacilityCode, hCtx.Task.ID.String())
payload := statusFromContext(hCtx)

var err error
var rev uint64
if hCtx.LastRev == 0 {
rev, err = s.kv.Create(key, payload)
} else {
rev, err = s.kv.Update(key, payload, hCtx.LastRev)
}

if err != nil {
s.log.WithError(err).WithFields(logrus.Fields{
"task_id": hCtx.Task.ID.String(),
"last_rev": hCtx.LastRev,
}).Warn("unable to write task status")
return
}
hCtx.LastRev = rev
}

type statusValue struct {
UpdatedAt time.Time `json:"updated"`
WorkerID string `json:"worker"`
Target string `json:"target"`
State string `json:"state"`
Status json.RawMessage `json:"status"`
// WorkSpec json.RawMessage `json:"spec"`
}

// panic if we cannot serialize to JSON
func (v *statusValue) MustBytes() []byte {
byt, err := json.Marshal(v)
if err != nil {
panic("unable to serialize status value: " + err.Error())
}
return byt
}

func statusFromContext(hCtx *sm.HandlerContext) []byte {
sv := &statusValue{
WorkerID: hCtx.WorkerID.String(),
Target: hCtx.Asset.ID.String(),
State: string(hCtx.Task.State()),
Status: statusInfoJSON(hCtx.Task.Status),
UpdatedAt: time.Now(),
}
return sv.MustBytes()
}

func NewStatusKVPublisher(s events.Stream, log *logrus.Logger, opts ...kv.Option) sm.Publisher {
js, ok := s.(*events.NatsJetstream)
if !ok {
log.Fatal("status-kv publisher is only supported on NATS")
}

kvOpts := defaultKVOpts
if len(opts) > 0 {
kvOpts = opts
}

statusKV, err := kv.CreateOrBindKVBucket(js, statusKVName, kvOpts...)
if err != nil {
log.WithError(err).Fatal("unable to bind status KV bucket")
}

return &statusKVPublisher{
kv: statusKV,
log: log,
}
}
106 changes: 106 additions & 0 deletions internal/worker/kv_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package worker

import (
"context"
"encoding/json"
"os"
"testing"
"time"

"github.com/google/uuid"
"github.com/metal-toolbox/flasher/internal/model"
sm "github.com/metal-toolbox/flasher/internal/statemachine"
"github.com/nats-io/nats-server/v2/server"
srvtest "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.hollow.sh/toolbox/events"
"go.hollow.sh/toolbox/events/pkg/kv"
"go.hollow.sh/toolbox/events/registry"
)

func startJetStreamServer(t *testing.T) *server.Server {
t.Helper()
opts := srvtest.DefaultTestOptions
opts.Port = -1
opts.JetStream = true
return srvtest.RunServer(&opts)
}

func jetStreamContext(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStreamContext) {
t.Helper()
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("connect => %v", err)
}
js, err := nc.JetStream(nats.MaxWait(10 * time.Second))
if err != nil {
t.Fatalf("JetStream => %v", err)
}
return nc, js
}

func shutdownJetStream(t *testing.T, s *server.Server) {
t.Helper()
var sd string
if config := s.JetStreamConfig(); config != nil {
sd = config.StoreDir
}
s.Shutdown()
if sd != "" {
if err := os.RemoveAll(sd); err != nil {
t.Fatalf("Unable to remove storage %q: %v", sd, err)
}
}
s.WaitForShutdown()
}

func TestPublisher(t *testing.T) {
srv := startJetStreamServer(t)
defer shutdownJetStream(t, srv)
nc, js := jetStreamContext(t, srv) // nc is closed on evJS.Close(), js needs no cleanup
evJS := events.NewJetstreamFromConn(nc)
defer evJS.Close()

pub := NewStatusKVPublisher(evJS, logrus.New(), kv.WithReplicas(1))
require.NotNil(t, pub, "publisher constructor")

readHandle, err := js.KeyValue("flasher-status")
require.NoError(t, err, "read handle")

taskID := uuid.New()
assetID := uuid.New()

testContext := &sm.HandlerContext{
Ctx: context.TODO(),
Task: &model.Task{
ID: taskID,
Status: "some-status",
},
WorkerID: registry.GetID("kvtest"),
Asset: &model.Asset{
ID: assetID,
FacilityCode: "fac13",
},
}
testContext.Task.SetState(model.StatePending)
require.NotPanics(t, func() { pub.Publish(testContext) }, "publish initial")
require.NotEqual(t, 0, testContext.LastRev, "last rev - 1")

entry, err := readHandle.Get("fac13/" + taskID.String())
require.Equal(t, entry.Revision(), testContext.LastRev, "last rev - 2")

sv := &statusValue{}
err = json.Unmarshal(entry.Value(), sv)
require.NoError(t, err, "unmarshal")

require.Equal(t, assetID.String(), sv.Target, "sv Target")
require.Equal(t, json.RawMessage(`{"msg":"some-status"}`), sv.Status, "sv Status")

testContext.Task.SetState(model.StateActive)
require.NotPanics(t, func() { pub.Publish(testContext) }, "publish revision")

entry, err = readHandle.Get("fac13/" + taskID.String())
require.Equal(t, entry.Revision(), testContext.LastRev, "last rev - 3")
}
Loading

0 comments on commit 0c35a03

Please sign in to comment.