Skip to content

Commit

Permalink
Agent actions token support (#23452)
Browse files Browse the repository at this point in the history
* Agent actions token support

* Make check happy

* Consolidate action store and the ack token store into state.yml store

* Make state storage thread safe
  • Loading branch information
aleksmaus authored Jan 19, 2021
1 parent 1471f97 commit a233f03
Show file tree
Hide file tree
Showing 15 changed files with 645 additions and 90 deletions.
41 changes: 1 addition & 40 deletions x-pack/elastic-agent/pkg/agent/application/action_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package application

import (
"context"
"fmt"
"io"

Expand All @@ -19,6 +18,7 @@ import (
// take care of action policy change every other action are discarded. The store will only keep the
// last good action on disk, we assume that the action is added to the store after it was ACK with
// Fleet. The store is not threadsafe.
// ATTN!!!: THE actionStore is deprecated, please use and extend the stateStore instead. The actionStore will be eventually removed.
type actionStore struct {
log *logger.Logger
store storeLoad
Expand Down Expand Up @@ -148,42 +148,3 @@ type actionUnenrollSerializer struct {

// Add a guards between the serializer structs and the original struct.
var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{})

// actionStoreAcker wraps an existing acker and will send any acked event to the action store,
// its up to the action store to decide if we need to persist the event for future replay or just
// discard the event.
type actionStoreAcker struct {
acker fleetAcker
store *actionStore
}

func (a *actionStoreAcker) Ack(ctx context.Context, action fleetapi.Action) error {
if err := a.acker.Ack(ctx, action); err != nil {
return err
}
a.store.Add(action)
return a.store.Save()
}

func (a *actionStoreAcker) Commit(ctx context.Context) error {
return a.acker.Commit(ctx)
}

func newActionStoreAcker(acker fleetAcker, store *actionStore) *actionStoreAcker {
return &actionStoreAcker{acker: acker, store: store}
}

func replayActions(
log *logger.Logger,
dispatcher dispatcher,
acker fleetAcker,
actions ...action,
) error {
log.Info("restoring current policy from disk")

if err := dispatcher.Dispatch(acker, actions...); err != nil {
return err
}

return nil
}
18 changes: 0 additions & 18 deletions x-pack/elastic-agent/pkg/agent/application/action_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package application

import (
"context"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -84,21 +83,4 @@ func TestActionStore(t *testing.T) {

require.Equal(t, ActionPolicyChange, actions[0])
}))

t.Run("when we ACK we save to disk",
withFile(func(t *testing.T, file string) {
ActionPolicyChange := &fleetapi.ActionPolicyChange{
ActionID: "abc123",
}

s := storage.NewDiskStore(file)
store, err := newActionStore(log, s)
require.NoError(t, err)

acker := newActionStoreAcker(&testAcker{}, store)
require.Equal(t, 0, len(store.Actions()))

require.NoError(t, acker.Ack(context.Background(), ActionPolicyChange))
require.Equal(t, 1, len(store.Actions()))
}))
}
21 changes: 21 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type fleetGateway struct {
unauthCounter int
statusController status.Controller
statusReporter status.Reporter
stateStore *stateStore
}

func newFleetGateway(
Expand All @@ -88,6 +89,7 @@ func newFleetGateway(
r fleetReporter,
acker fleetAcker,
statusController status.Controller,
stateStore *stateStore,
) (*fleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
Expand All @@ -102,6 +104,7 @@ func newFleetGateway(
r,
acker,
statusController,
stateStore,
)
}

Expand All @@ -116,6 +119,7 @@ func newFleetGatewayWithScheduler(
r fleetReporter,
acker fleetAcker,
statusController status.Controller,
stateStore *stateStore,
) (*fleetGateway, error) {

// Backoff implementation doesn't support the using context as the shutdown mechanism.
Expand All @@ -140,6 +144,7 @@ func newFleetGatewayWithScheduler(
acker: acker,
statusReporter: statusController.Register("gateway"),
statusController: statusController,
stateStore: stateStore,
}, nil
}

Expand Down Expand Up @@ -209,9 +214,16 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
f.log.Error(errors.New("failed to load metadata", err))
}

// retrieve ack token from the store
ackToken := f.stateStore.AckToken()
if ackToken != "" {
f.log.Debug("using previously saved ack token: %v", ackToken)
}

// checkin
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Events: ee,
Metadata: ecsMeta,
Status: f.statusController.StatusString(),
Expand All @@ -236,6 +248,15 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
return nil, err
}

// Save the latest ackToken
if resp.AckToken != "" {
f.stateStore.SetAckToken(resp.AckToken)
serr := f.stateStore.Save()
if serr != nil {
f.log.Errorf("failed to save the ack token, err: %v", serr)
}
}

// ack events so they are dropped from queue
ack()
return resp, nil
Expand Down
15 changes: 15 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
repo "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet"
Expand Down Expand Up @@ -117,6 +119,9 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile()))
require.NoError(t, err)

gateway, err := newFleetGatewayWithScheduler(
ctx,
log,
Expand All @@ -128,6 +133,7 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
rep,
newNoopAcker(),
&noopController{},
stateStore,
)

require.NoError(t, err)
Expand Down Expand Up @@ -242,6 +248,9 @@ func TestFleetGateway(t *testing.T) {
defer cancel()

log, _ := logger.New("tst")
stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile()))
require.NoError(t, err)

gateway, err := newFleetGatewayWithScheduler(
ctx,
log,
Expand All @@ -253,6 +262,7 @@ func TestFleetGateway(t *testing.T) {
getReporter(agentInfo, log, t),
newNoopAcker(),
&noopController{},
stateStore,
)

require.NoError(t, err)
Expand Down Expand Up @@ -328,6 +338,10 @@ func TestFleetGateway(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
log, _ := logger.New("tst")

stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile()))
require.NoError(t, err)

gateway, err := newFleetGatewayWithScheduler(
ctx,
log,
Expand All @@ -342,6 +356,7 @@ func TestFleetGateway(t *testing.T) {
getReporter(agentInfo, log, t),
newNoopAcker(),
&noopController{},
stateStore,
)

require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"context"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

type handlerAppAction struct {
log *logger.Logger
}

func (h *handlerAppAction) Handle(ctx context.Context, a action, acker fleetAcker) error {
h.log.Debugf("handlerAppAction: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionApp)
if !ok {
return fmt.Errorf("invalid type, expected ActionApp and received %T", a)
}

_ = action

// TODO: handle app action

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
// After running Unenroll agent is in idle state, non managed non standalone.
// For it to be operational again it needs to be either enrolled or reconfigured.
type handlerUnenroll struct {
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
closers []context.CancelFunc
actionStore *actionStore
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
closers []context.CancelFunc
stateStore *stateStore
}

func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error {
Expand All @@ -44,10 +44,10 @@ func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker
if err := acker.Commit(ctx); err != nil {
return err
}
} else if h.actionStore != nil {
} else if h.stateStore != nil {
// backup action for future start to avoid starting fleet gateway loop
h.actionStore.Add(a)
h.actionStore.Save()
h.stateStore.Add(a)
h.stateStore.Save()
}

// close fleet gateway loop
Expand Down
8 changes: 7 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/info/agent_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const agentInfoKey = "agent"

// defaultAgentActionStoreFile is the file that will contains the action that can be replayed after restart.
const defaultAgentActionStoreFile = "action_store.yml"
const defaultAgentStateStoreFile = "state.yml"

const defaultLogLevel = "info"

Expand All @@ -43,11 +44,16 @@ func AgentConfigFile() string {
return filepath.Join(paths.Config(), defaultAgentConfigFile)
}

// AgentActionStoreFile is the file that will contains the action that can be replayed after restart.
// AgentActionStoreFile is the file that contains the action that can be replayed after restart.
func AgentActionStoreFile() string {
return filepath.Join(paths.Home(), defaultAgentActionStoreFile)
}

// AgentStateStoreFile is the file that contains the persisted state of the agent including the action that can be replayed after restart.
func AgentStateStoreFile() string {
return filepath.Join(paths.Home(), defaultAgentStateStoreFile)
}

// updateLogLevel updates log level and persists it to disk.
func updateLogLevel(level string) error {
ai, err := loadAgentInfo(false, defaultLogLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ func loadFleetConfig(cfg *config.Config) (map[string]interface{}, error) {
return nil, err
}

as, err := newActionStore(log, storage.NewDiskStore(info.AgentActionStoreFile()))
stateStore, err := newStateStoreWithMigration(log, info.AgentActionStoreFile(), info.AgentStateStoreFile())
if err != nil {
return nil, err
}

for _, c := range as.Actions() {
for _, c := range stateStore.Actions() {
cfgChange, ok := c.(*fleetapi.ActionPolicyChange)
if !ok {
continue
Expand Down
Loading

0 comments on commit a233f03

Please sign in to comment.