Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent actions token support (#23452) #23569

Merged
merged 1 commit into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 1 addition & 40 deletions x-pack/elastic-agent/pkg/agent/application/action_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package application

import (
"context"
"fmt"
"io"

Expand All @@ -19,6 +18,7 @@ import (
// take care of action policy change every other action are discarded. The store will only keep the
// last good action on disk, we assume that the action is added to the store after it was ACK with
// Fleet. The store is not threadsafe.
// ATTN!!!: THE actionStore is deprecated, please use and extend the stateStore instead. The actionStore will be eventually removed.
type actionStore struct {
log *logger.Logger
store storeLoad
Expand Down Expand Up @@ -148,42 +148,3 @@ type actionUnenrollSerializer struct {

// Add a guards between the serializer structs and the original struct.
var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{})

// actionStoreAcker wraps an existing acker and will send any acked event to the action store,
// its up to the action store to decide if we need to persist the event for future replay or just
// discard the event.
type actionStoreAcker struct {
acker fleetAcker
store *actionStore
}

func (a *actionStoreAcker) Ack(ctx context.Context, action fleetapi.Action) error {
if err := a.acker.Ack(ctx, action); err != nil {
return err
}
a.store.Add(action)
return a.store.Save()
}

func (a *actionStoreAcker) Commit(ctx context.Context) error {
return a.acker.Commit(ctx)
}

func newActionStoreAcker(acker fleetAcker, store *actionStore) *actionStoreAcker {
return &actionStoreAcker{acker: acker, store: store}
}

func replayActions(
log *logger.Logger,
dispatcher dispatcher,
acker fleetAcker,
actions ...action,
) error {
log.Info("restoring current policy from disk")

if err := dispatcher.Dispatch(acker, actions...); err != nil {
return err
}

return nil
}
18 changes: 0 additions & 18 deletions x-pack/elastic-agent/pkg/agent/application/action_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package application

import (
"context"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -84,21 +83,4 @@ func TestActionStore(t *testing.T) {

require.Equal(t, ActionPolicyChange, actions[0])
}))

t.Run("when we ACK we save to disk",
withFile(func(t *testing.T, file string) {
ActionPolicyChange := &fleetapi.ActionPolicyChange{
ActionID: "abc123",
}

s := storage.NewDiskStore(file)
store, err := newActionStore(log, s)
require.NoError(t, err)

acker := newActionStoreAcker(&testAcker{}, store)
require.Equal(t, 0, len(store.Actions()))

require.NoError(t, acker.Ack(context.Background(), ActionPolicyChange))
require.Equal(t, 1, len(store.Actions()))
}))
}
21 changes: 21 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type fleetGateway struct {
unauthCounter int
statusController status.Controller
statusReporter status.Reporter
stateStore *stateStore
}

func newFleetGateway(
Expand All @@ -88,6 +89,7 @@ func newFleetGateway(
r fleetReporter,
acker fleetAcker,
statusController status.Controller,
stateStore *stateStore,
) (*fleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
Expand All @@ -102,6 +104,7 @@ func newFleetGateway(
r,
acker,
statusController,
stateStore,
)
}

Expand All @@ -116,6 +119,7 @@ func newFleetGatewayWithScheduler(
r fleetReporter,
acker fleetAcker,
statusController status.Controller,
stateStore *stateStore,
) (*fleetGateway, error) {

// Backoff implementation doesn't support the using context as the shutdown mechanism.
Expand All @@ -140,6 +144,7 @@ func newFleetGatewayWithScheduler(
acker: acker,
statusReporter: statusController.Register("gateway"),
statusController: statusController,
stateStore: stateStore,
}, nil
}

Expand Down Expand Up @@ -209,9 +214,16 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
f.log.Error(errors.New("failed to load metadata", err))
}

// retrieve ack token from the store
ackToken := f.stateStore.AckToken()
if ackToken != "" {
f.log.Debug("using previously saved ack token: %v", ackToken)
}

// checkin
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Events: ee,
Metadata: ecsMeta,
Status: f.statusController.StatusString(),
Expand All @@ -236,6 +248,15 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
return nil, err
}

// Save the latest ackToken
if resp.AckToken != "" {
f.stateStore.SetAckToken(resp.AckToken)
serr := f.stateStore.Save()
if serr != nil {
f.log.Errorf("failed to save the ack token, err: %v", serr)
}
}

// ack events so they are dropped from queue
ack()
return resp, nil
Expand Down
15 changes: 15 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
repo "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet"
Expand Down Expand Up @@ -116,6 +118,9 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat

ctx, cancel := context.WithCancel(context.Background())

stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile()))
require.NoError(t, err)

gateway, err := newFleetGatewayWithScheduler(
ctx,
log,
Expand All @@ -127,6 +132,7 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
rep,
newNoopAcker(),
&noopController{},
stateStore,
)

go gateway.Start()
Expand Down Expand Up @@ -245,6 +251,9 @@ func TestFleetGateway(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
log, _ := logger.New("tst")
stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile()))
require.NoError(t, err)

gateway, err := newFleetGatewayWithScheduler(
ctx,
log,
Expand All @@ -256,6 +265,7 @@ func TestFleetGateway(t *testing.T) {
getReporter(agentInfo, log, t),
newNoopAcker(),
&noopController{},
stateStore,
)

go gateway.Start()
Expand Down Expand Up @@ -331,6 +341,10 @@ func TestFleetGateway(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
log, _ := logger.New("tst")

stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile()))
require.NoError(t, err)

gateway, err := newFleetGatewayWithScheduler(
ctx,
log,
Expand All @@ -345,6 +359,7 @@ func TestFleetGateway(t *testing.T) {
getReporter(agentInfo, log, t),
newNoopAcker(),
&noopController{},
stateStore,
)

require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"context"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

type handlerAppAction struct {
log *logger.Logger
}

func (h *handlerAppAction) Handle(ctx context.Context, a action, acker fleetAcker) error {
h.log.Debugf("handlerAppAction: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionApp)
if !ok {
return fmt.Errorf("invalid type, expected ActionApp and received %T", a)
}

_ = action

// TODO: handle app action

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
// After running Unenroll agent is in idle state, non managed non standalone.
// For it to be operational again it needs to be either enrolled or reconfigured.
type handlerUnenroll struct {
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
closers []context.CancelFunc
actionStore *actionStore
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
closers []context.CancelFunc
stateStore *stateStore
}

func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error {
Expand All @@ -44,10 +44,10 @@ func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker
if err := acker.Commit(ctx); err != nil {
return err
}
} else if h.actionStore != nil {
} else if h.stateStore != nil {
// backup action for future start to avoid starting fleet gateway loop
h.actionStore.Add(a)
h.actionStore.Save()
h.stateStore.Add(a)
h.stateStore.Save()
}

// close fleet gateway loop
Expand Down
8 changes: 7 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/info/agent_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const agentInfoKey = "agent"

// defaultAgentActionStoreFile is the file that will contains the action that can be replayed after restart.
const defaultAgentActionStoreFile = "action_store.yml"
const defaultAgentStateStoreFile = "state.yml"

const defaultLogLevel = "info"

Expand All @@ -43,11 +44,16 @@ func AgentConfigFile() string {
return filepath.Join(paths.Config(), defaultAgentConfigFile)
}

// AgentActionStoreFile is the file that will contains the action that can be replayed after restart.
// AgentActionStoreFile is the file that contains the action that can be replayed after restart.
func AgentActionStoreFile() string {
return filepath.Join(paths.Home(), defaultAgentActionStoreFile)
}

// AgentStateStoreFile is the file that contains the persisted state of the agent including the action that can be replayed after restart.
func AgentStateStoreFile() string {
return filepath.Join(paths.Home(), defaultAgentStateStoreFile)
}

// updateLogLevel updates log level and persists it to disk.
func updateLogLevel(level string) error {
ai, err := loadAgentInfo(false, defaultLogLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ func loadFleetConfig(cfg *config.Config) (map[string]interface{}, error) {
return nil, err
}

as, err := newActionStore(log, storage.NewDiskStore(info.AgentActionStoreFile()))
stateStore, err := newStateStoreWithMigration(log, info.AgentActionStoreFile(), info.AgentStateStoreFile())
if err != nil {
return nil, err
}

for _, c := range as.Actions() {
for _, c := range stateStore.Actions() {
cfgChange, ok := c.(*fleetapi.ActionPolicyChange)
if !ok {
continue
Expand Down
Loading