diff --git a/x-pack/elastic-agent/pkg/agent/application/action_store.go b/x-pack/elastic-agent/pkg/agent/application/action_store.go index ce4ea785cf71..646ab828ef5e 100644 --- a/x-pack/elastic-agent/pkg/agent/application/action_store.go +++ b/x-pack/elastic-agent/pkg/agent/application/action_store.go @@ -5,7 +5,6 @@ package application import ( - "context" "fmt" "io" @@ -19,6 +18,7 @@ import ( // take care of action policy change every other action are discarded. The store will only keep the // last good action on disk, we assume that the action is added to the store after it was ACK with // Fleet. The store is not threadsafe. +// ATTN!!!: THE actionStore is deprecated, please use and extend the stateStore instead. The actionStore will be eventually removed. type actionStore struct { log *logger.Logger store storeLoad @@ -148,42 +148,3 @@ type actionUnenrollSerializer struct { // Add a guards between the serializer structs and the original struct. var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{}) - -// actionStoreAcker wraps an existing acker and will send any acked event to the action store, -// its up to the action store to decide if we need to persist the event for future replay or just -// discard the event. -type actionStoreAcker struct { - acker fleetAcker - store *actionStore -} - -func (a *actionStoreAcker) Ack(ctx context.Context, action fleetapi.Action) error { - if err := a.acker.Ack(ctx, action); err != nil { - return err - } - a.store.Add(action) - return a.store.Save() -} - -func (a *actionStoreAcker) Commit(ctx context.Context) error { - return a.acker.Commit(ctx) -} - -func newActionStoreAcker(acker fleetAcker, store *actionStore) *actionStoreAcker { - return &actionStoreAcker{acker: acker, store: store} -} - -func replayActions( - log *logger.Logger, - dispatcher dispatcher, - acker fleetAcker, - actions ...action, -) error { - log.Info("restoring current policy from disk") - - if err := dispatcher.Dispatch(acker, actions...); err != nil { - return err - } - - return nil -} diff --git a/x-pack/elastic-agent/pkg/agent/application/action_store_test.go b/x-pack/elastic-agent/pkg/agent/application/action_store_test.go index f2691d66db68..cc5aa47ebca6 100644 --- a/x-pack/elastic-agent/pkg/agent/application/action_store_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/action_store_test.go @@ -5,7 +5,6 @@ package application import ( - "context" "io/ioutil" "os" "path/filepath" @@ -84,21 +83,4 @@ func TestActionStore(t *testing.T) { require.Equal(t, ActionPolicyChange, actions[0]) })) - - t.Run("when we ACK we save to disk", - withFile(func(t *testing.T, file string) { - ActionPolicyChange := &fleetapi.ActionPolicyChange{ - ActionID: "abc123", - } - - s := storage.NewDiskStore(file) - store, err := newActionStore(log, s) - require.NoError(t, err) - - acker := newActionStoreAcker(&testAcker{}, store) - require.Equal(t, 0, len(store.Actions())) - - require.NoError(t, acker.Ack(context.Background(), ActionPolicyChange)) - require.Equal(t, 1, len(store.Actions())) - })) } diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go index 8facdfc4ed15..0ec71d7a5fa7 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go @@ -77,6 +77,7 @@ type fleetGateway struct { unauthCounter int statusController status.Controller statusReporter status.Reporter + stateStore *stateStore } func newFleetGateway( @@ -88,6 +89,7 @@ func newFleetGateway( r fleetReporter, acker fleetAcker, statusController status.Controller, + stateStore *stateStore, ) (*fleetGateway, error) { scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter) @@ -102,6 +104,7 @@ func newFleetGateway( r, acker, statusController, + stateStore, ) } @@ -116,6 +119,7 @@ func newFleetGatewayWithScheduler( r fleetReporter, acker fleetAcker, statusController status.Controller, + stateStore *stateStore, ) (*fleetGateway, error) { // Backoff implementation doesn't support the using context as the shutdown mechanism. @@ -140,6 +144,7 @@ func newFleetGatewayWithScheduler( acker: acker, statusReporter: statusController.Register("gateway"), statusController: statusController, + stateStore: stateStore, }, nil } @@ -209,9 +214,16 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, f.log.Error(errors.New("failed to load metadata", err)) } + // retrieve ack token from the store + ackToken := f.stateStore.AckToken() + if ackToken != "" { + f.log.Debug("using previously saved ack token: %v", ackToken) + } + // checkin cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client) req := &fleetapi.CheckinRequest{ + AckToken: ackToken, Events: ee, Metadata: ecsMeta, Status: f.statusController.StatusString(), @@ -236,6 +248,15 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, return nil, err } + // Save the latest ackToken + if resp.AckToken != "" { + f.stateStore.SetAckToken(resp.AckToken) + serr := f.stateStore.Save() + if serr != nil { + f.log.Errorf("failed to save the ack token, err: %v", serr) + } + } + // ack events so they are dropped from queue ack() return resp, nil diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go index f57255c980a1..23b4eafde548 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go @@ -20,6 +20,8 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" repo "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet" @@ -116,6 +118,9 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat ctx, cancel := context.WithCancel(context.Background()) + stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile())) + require.NoError(t, err) + gateway, err := newFleetGatewayWithScheduler( ctx, log, @@ -127,6 +132,7 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat rep, newNoopAcker(), &noopController{}, + stateStore, ) go gateway.Start() @@ -245,6 +251,9 @@ func TestFleetGateway(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) log, _ := logger.New("tst") + stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile())) + require.NoError(t, err) + gateway, err := newFleetGatewayWithScheduler( ctx, log, @@ -256,6 +265,7 @@ func TestFleetGateway(t *testing.T) { getReporter(agentInfo, log, t), newNoopAcker(), &noopController{}, + stateStore, ) go gateway.Start() @@ -331,6 +341,10 @@ func TestFleetGateway(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) log, _ := logger.New("tst") + + stateStore, err := newStateStore(log, storage.NewDiskStore(info.AgentStateStoreFile())) + require.NoError(t, err) + gateway, err := newFleetGatewayWithScheduler( ctx, log, @@ -345,6 +359,7 @@ func TestFleetGateway(t *testing.T) { getReporter(agentInfo, log, t), newNoopAcker(), &noopController{}, + stateStore, ) require.NoError(t, err) diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_application.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_application.go new file mode 100644 index 000000000000..56b5ee3499c6 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_application.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "context" + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" +) + +type handlerAppAction struct { + log *logger.Logger +} + +func (h *handlerAppAction) Handle(ctx context.Context, a action, acker fleetAcker) error { + h.log.Debugf("handlerAppAction: action '%+v' received", a) + action, ok := a.(*fleetapi.ActionApp) + if !ok { + return fmt.Errorf("invalid type, expected ActionApp and received %T", a) + } + + _ = action + + // TODO: handle app action + + return nil +} diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go index da33f2001ff8..83dca329342e 100644 --- a/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go @@ -16,11 +16,11 @@ import ( // After running Unenroll agent is in idle state, non managed non standalone. // For it to be operational again it needs to be either enrolled or reconfigured. type handlerUnenroll struct { - log *logger.Logger - emitter emitterFunc - dispatcher programsDispatcher - closers []context.CancelFunc - actionStore *actionStore + log *logger.Logger + emitter emitterFunc + dispatcher programsDispatcher + closers []context.CancelFunc + stateStore *stateStore } func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error { @@ -44,10 +44,10 @@ func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker if err := acker.Commit(ctx); err != nil { return err } - } else if h.actionStore != nil { + } else if h.stateStore != nil { // backup action for future start to avoid starting fleet gateway loop - h.actionStore.Add(a) - h.actionStore.Save() + h.stateStore.Add(a) + h.stateStore.Save() } // close fleet gateway loop diff --git a/x-pack/elastic-agent/pkg/agent/application/info/agent_id.go b/x-pack/elastic-agent/pkg/agent/application/info/agent_id.go index f18fa542a251..a6551b685803 100644 --- a/x-pack/elastic-agent/pkg/agent/application/info/agent_id.go +++ b/x-pack/elastic-agent/pkg/agent/application/info/agent_id.go @@ -25,6 +25,7 @@ const agentInfoKey = "agent" // defaultAgentActionStoreFile is the file that will contains the action that can be replayed after restart. const defaultAgentActionStoreFile = "action_store.yml" +const defaultAgentStateStoreFile = "state.yml" const defaultLogLevel = "info" @@ -43,11 +44,16 @@ func AgentConfigFile() string { return filepath.Join(paths.Config(), defaultAgentConfigFile) } -// AgentActionStoreFile is the file that will contains the action that can be replayed after restart. +// AgentActionStoreFile is the file that contains the action that can be replayed after restart. func AgentActionStoreFile() string { return filepath.Join(paths.Home(), defaultAgentActionStoreFile) } +// AgentStateStoreFile is the file that contains the persisted state of the agent including the action that can be replayed after restart. +func AgentStateStoreFile() string { + return filepath.Join(paths.Home(), defaultAgentStateStoreFile) +} + // updateLogLevel updates log level and persists it to disk. func updateLogLevel(level string) error { ai, err := loadAgentInfo(false, defaultLogLevel) diff --git a/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go b/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go index f1bd2893bf00..4ea725898f27 100644 --- a/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go @@ -100,12 +100,12 @@ func loadFleetConfig(cfg *config.Config) (map[string]interface{}, error) { return nil, err } - as, err := newActionStore(log, storage.NewDiskStore(info.AgentActionStoreFile())) + stateStore, err := newStateStoreWithMigration(log, info.AgentActionStoreFile(), info.AgentStateStoreFile()) if err != nil { return nil, err } - for _, c := range as.Actions() { + for _, c := range stateStore.Actions() { cfgChange, ok := c.(*fleetapi.ActionPolicyChange) if !ok { continue diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index fb3bc7e897d4..b4b6e9ecb370 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -58,7 +58,7 @@ type Managed struct { gateway *fleetGateway router *router srv *server.Server - as *actionStore + stateStore *stateStore upgrader *upgrade.Upgrader } @@ -189,13 +189,13 @@ func newManaged( batchedAcker := newLazyAcker(acker) - // Create the action store that will persist the last good policy change on disk. - actionStore, err := newActionStore(log, storage.NewDiskStore(info.AgentActionStoreFile())) + // Create the state store that will persist the last good policy change on disk. + stateStore, err := newStateStoreWithMigration(log, info.AgentActionStoreFile(), info.AgentStateStoreFile()) if err != nil { return nil, errors.New(err, fmt.Sprintf("fail to read action store '%s'", info.AgentActionStoreFile())) } - managedApplication.as = actionStore - actionAcker := newActionStoreAcker(batchedAcker, actionStore) + managedApplication.stateStore = stateStore + actionAcker := newStateStoreActionAcker(batchedAcker, stateStore) actionDispatcher, err := newActionDispatcher(managedApplication.bgContext, log, &handlerDefault{log: log}) if err != nil { @@ -227,11 +227,11 @@ func newManaged( actionDispatcher.MustRegister( &fleetapi.ActionUnenroll{}, &handlerUnenroll{ - log: log, - emitter: emit, - dispatcher: router, - closers: []context.CancelFunc{managedApplication.cancelCtxFn}, - actionStore: actionStore, + log: log, + emitter: emit, + dispatcher: router, + closers: []context.CancelFunc{managedApplication.cancelCtxFn}, + stateStore: stateStore, }, ) @@ -252,12 +252,19 @@ func newManaged( }, ) + actionDispatcher.MustRegister( + &fleetapi.ActionApp{}, + &handlerAppAction{ + log: log, + }, + ) + actionDispatcher.MustRegister( &fleetapi.ActionUnknown{}, &handlerUnknown{log: log}, ) - actions := actionStore.Actions() + actions := stateStore.Actions() if len(actions) > 0 && !managedApplication.wasUnenrolled() { // TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a @@ -277,6 +284,7 @@ func newManaged( fleetR, actionAcker, statusController, + stateStore, ) if err != nil { return nil, err @@ -320,7 +328,7 @@ func (m *Managed) AgentInfo() *info.AgentInfo { } func (m *Managed) wasUnenrolled() bool { - actions := m.as.Actions() + actions := m.stateStore.Actions() for _, a := range actions { if a.Type() == "UNENROLL" { return true diff --git a/x-pack/elastic-agent/pkg/agent/application/state_store.go b/x-pack/elastic-agent/pkg/agent/application/state_store.go new file mode 100644 index 000000000000..81d3f901469c --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/application/state_store.go @@ -0,0 +1,303 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "context" + "fmt" + "io" + "sync" + + yaml "gopkg.in/yaml.v2" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" +) + +// stateStore is a combined agent state storage initially derived from the former actionStore +// and modified to allow persistence of additional agent specific state information. +// The following is the original actionStore implementation description: +// receives multiples actions to persist to disk, the implementation of the store only +// take care of action policy change every other action are discarded. The store will only keep the +// last good action on disk, we assume that the action is added to the store after it was ACK with +// Fleet. The store is not threadsafe. +type stateStore struct { + log *logger.Logger + store storeLoad + dirty bool + state stateT + + mx sync.RWMutex +} + +type stateT struct { + action action + ackToken string +} + +// Combined yml serializer for the ActionPolicyChange and ActionUnenroll +type actionSerializer struct { + ID string `yaml:"action_id"` + Type string `yaml:"action_type"` + Policy map[string]interface{} `yaml:"policy,omitempty"` + IsDetected *bool `yaml:"is_detected,omitempty"` +} + +type stateSerializer struct { + Action *actionSerializer `yaml:"action,omitempty"` + AckToken string `yaml:"ack_token,omitempty"` +} + +func migrateStateStore(log *logger.Logger, actionStorePath, stateStorePath string) (err error) { + log = log.Named("state_migration") + actionDiskStore := storage.NewDiskStore(actionStorePath) + stateDiskStore := storage.NewDiskStore(stateStorePath) + + stateStoreExits, err := stateDiskStore.Exists() + if err != nil { + log.With() + log.Errorf("failed to check if state store %s exists: %v", stateStorePath, err) + return err + } + + // do not migrate if the state store already exists + if stateStoreExits { + log.Debugf("state store %s already exists", stateStorePath) + return nil + } + + actionStoreExits, err := actionDiskStore.Exists() + if err != nil { + log.Errorf("failed to check if action store %s exists: %v", actionStorePath, err) + return err + } + + // delete the actions store file upon successful migration + defer func() { + if err == nil && actionStoreExits { + err = actionDiskStore.Delete() + if err != nil { + log.Errorf("failed to delete action store %s exists: %v", actionStorePath, err) + } + } + }() + + // nothing to migrate if the action store doesn't exists + if !actionStoreExits { + log.Debugf("action store %s doesn't exists, nothing to migrate", actionStorePath) + return nil + } + + actionStore, err := newActionStore(log, actionDiskStore) + if err != nil { + log.Errorf("failed to create action store %s: %v", actionStorePath, err) + return err + } + + // no actions stored nothing to migrate + if len(actionStore.Actions()) == 0 { + log.Debugf("no actions stored in the action store %s, nothing to migrate", actionStorePath) + return nil + } + + stateStore, err := newStateStore(log, stateDiskStore) + if err != nil { + return err + } + + // set actions from the action store to the state store + stateStore.Add(actionStore.Actions()[0]) + + err = stateStore.Save() + if err != nil { + log.Debugf("failed to save agent state store %s, err: %v", stateStorePath, err) + } + return err +} + +func newStateStoreWithMigration(log *logger.Logger, actionStorePath, stateStorePath string) (*stateStore, error) { + err := migrateStateStore(log, actionStorePath, stateStorePath) + if err != nil { + return nil, err + } + + return newStateStore(log, storage.NewDiskStore(stateStorePath)) +} + +func newStateStore(log *logger.Logger, store storeLoad) (*stateStore, error) { + // If the store exists we will read it, if any errors is returned we assume we do not have anything + // persisted and we return an empty store. + reader, err := store.Load() + if err != nil { + return &stateStore{log: log, store: store}, nil + } + defer reader.Close() + + var sr stateSerializer + + dec := yaml.NewDecoder(reader) + err = dec.Decode(&sr) + if err == io.EOF { + return &stateStore{ + log: log, + store: store, + }, nil + } + + if err != nil { + return nil, err + } + + state := stateT{ + ackToken: sr.AckToken, + } + + if sr.Action != nil { + if sr.Action.IsDetected != nil { + state.action = &fleetapi.ActionUnenroll{ + ActionID: sr.Action.ID, + ActionType: sr.Action.Type, + IsDetected: *sr.Action.IsDetected, + } + } else { + state.action = &fleetapi.ActionPolicyChange{ + ActionID: sr.Action.ID, + ActionType: sr.Action.Type, + Policy: sr.Action.Policy, + } + } + } + + return &stateStore{ + log: log, + store: store, + state: state, + }, nil +} + +// Add is only taking care of ActionPolicyChange for now and will only keep the last one it receive, +// any other type of action will be silently ignored. +func (s *stateStore) Add(a action) { + s.mx.Lock() + defer s.mx.Unlock() + + switch v := a.(type) { + case *fleetapi.ActionPolicyChange, *fleetapi.ActionUnenroll: + // Only persist the action if the action is different. + if s.state.action != nil && s.state.action.ID() == v.ID() { + return + } + s.dirty = true + s.state.action = a + } +} + +// SetAckToken set ack token to the agent state +func (s *stateStore) SetAckToken(ackToken string) { + s.mx.Lock() + defer s.mx.Unlock() + + if s.state.ackToken == ackToken { + return + } + s.dirty = true + s.state.ackToken = ackToken +} + +func (s *stateStore) Save() error { + s.mx.Lock() + defer s.mx.Unlock() + + defer func() { s.dirty = false }() + if !s.dirty { + return nil + } + + var reader io.Reader + serialize := stateSerializer{ + AckToken: s.state.ackToken, + } + + if s.state.action != nil { + if apc, ok := s.state.action.(*fleetapi.ActionPolicyChange); ok { + serialize.Action = &actionSerializer{apc.ActionID, apc.ActionType, apc.Policy, nil} + } else if aun, ok := s.state.action.(*fleetapi.ActionUnenroll); ok { + serialize.Action = &actionSerializer{apc.ActionID, apc.ActionType, nil, &aun.IsDetected} + } else { + return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.state.action) + } + } + + reader, err := yamlToReader(&serialize) + if err != nil { + return err + } + + if err := s.store.Save(reader); err != nil { + return err + } + s.log.Debugf("save state on disk : %+v", s.state) + return nil +} + +// Actions returns a slice of action to execute in order, currently only a action policy change is +// persisted. +func (s *stateStore) Actions() []action { + s.mx.RLock() + defer s.mx.RUnlock() + + if s.state.action == nil { + return []action{} + } + + return []action{s.state.action} +} + +// AckToken return the agent state persisted ack_token +func (s *stateStore) AckToken() string { + s.mx.RLock() + defer s.mx.RUnlock() + return s.state.ackToken +} + +// actionStoreAcker wraps an existing acker and will send any acked event to the action store, +// its up to the action store to decide if we need to persist the event for future replay or just +// discard the event. +type stateStoreActionAcker struct { + acker fleetAcker + store *stateStore +} + +func (a *stateStoreActionAcker) Ack(ctx context.Context, action fleetapi.Action) error { + if err := a.acker.Ack(ctx, action); err != nil { + return err + } + a.store.Add(action) + return a.store.Save() +} + +func (a *stateStoreActionAcker) Commit(ctx context.Context) error { + return a.acker.Commit(ctx) +} + +func newStateStoreActionAcker(acker fleetAcker, store *stateStore) *stateStoreActionAcker { + return &stateStoreActionAcker{acker: acker, store: store} +} + +func replayActions( + log *logger.Logger, + dispatcher dispatcher, + acker fleetAcker, + actions ...action, +) error { + log.Info("restoring current policy from disk") + + if err := dispatcher.Dispatch(acker, actions...); err != nil { + return err + } + + return nil +} diff --git a/x-pack/elastic-agent/pkg/agent/application/state_store_test.go b/x-pack/elastic-agent/pkg/agent/application/state_store_test.go new file mode 100644 index 000000000000..26ea1eaca683 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/application/state_store_test.go @@ -0,0 +1,170 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" +) + +func TestStateStore(t *testing.T) { + t.Run("ack token", func(t *testing.T) { + runTestStateStore(t, "") + }) + + t.Run("no ack token", func(t *testing.T) { + runTestStateStore(t, "czlV93YBwdkt5lYhBY7S") + }) +} + +func runTestStateStore(t *testing.T, ackToken string) { + log, _ := logger.New("state_store") + withFile := func(fn func(t *testing.T, file string)) func(*testing.T) { + return func(t *testing.T) { + dir, err := ioutil.TempDir("", "state-store") + require.NoError(t, err) + defer os.RemoveAll(dir) + file := filepath.Join(dir, "state.yml") + fn(t, file) + } + } + + t.Run("action returns empty when no action is saved on disk", + withFile(func(t *testing.T, file string) { + s := storage.NewDiskStore(file) + store, err := newStateStore(log, s) + require.NoError(t, err) + require.Equal(t, 0, len(store.Actions())) + })) + + t.Run("will discard silently unknown action", + withFile(func(t *testing.T, file string) { + actionPolicyChange := &fleetapi.ActionUnknown{ + ActionID: "abc123", + } + + s := storage.NewDiskStore(file) + store, err := newStateStore(log, s) + require.NoError(t, err) + + require.Equal(t, 0, len(store.Actions())) + store.Add(actionPolicyChange) + store.SetAckToken(ackToken) + err = store.Save() + require.NoError(t, err) + require.Equal(t, 0, len(store.Actions())) + require.Equal(t, ackToken, store.AckToken()) + })) + + t.Run("can save to disk known action type", + withFile(func(t *testing.T, file string) { + ActionPolicyChange := &fleetapi.ActionPolicyChange{ + ActionID: "abc123", + ActionType: "POLICY_CHANGE", + Policy: map[string]interface{}{ + "hello": "world", + }, + } + + s := storage.NewDiskStore(file) + store, err := newStateStore(log, s) + require.NoError(t, err) + + require.Equal(t, 0, len(store.Actions())) + store.Add(ActionPolicyChange) + store.SetAckToken(ackToken) + err = store.Save() + require.NoError(t, err) + require.Equal(t, 1, len(store.Actions())) + require.Equal(t, ackToken, store.AckToken()) + + s = storage.NewDiskStore(file) + store1, err := newStateStore(log, s) + require.NoError(t, err) + + actions := store1.Actions() + require.Equal(t, 1, len(actions)) + + require.Equal(t, ActionPolicyChange, actions[0]) + require.Equal(t, ackToken, store.AckToken()) + })) + + t.Run("when we ACK we save to disk", + withFile(func(t *testing.T, file string) { + ActionPolicyChange := &fleetapi.ActionPolicyChange{ + ActionID: "abc123", + } + + s := storage.NewDiskStore(file) + store, err := newStateStore(log, s) + require.NoError(t, err) + store.SetAckToken(ackToken) + + acker := newStateStoreActionAcker(&testAcker{}, store) + require.Equal(t, 0, len(store.Actions())) + + require.NoError(t, acker.Ack(context.Background(), ActionPolicyChange)) + require.Equal(t, 1, len(store.Actions())) + require.Equal(t, ackToken, store.AckToken()) + })) + + t.Run("migrate actions file does not exists", + withFile(func(t *testing.T, actionStorePath string) { + withFile(func(t *testing.T, stateStorePath string) { + err := migrateStateStore(log, actionStorePath, stateStorePath) + require.NoError(t, err) + stateStore, err := newStateStore(log, storage.NewDiskStore(stateStorePath)) + require.NoError(t, err) + stateStore.SetAckToken(ackToken) + require.Equal(t, 0, len(stateStore.Actions())) + require.Equal(t, ackToken, stateStore.AckToken()) + }) + })) + + t.Run("migrate", + withFile(func(t *testing.T, actionStorePath string) { + ActionPolicyChange := &fleetapi.ActionPolicyChange{ + ActionID: "abc123", + ActionType: "POLICY_CHANGE", + Policy: map[string]interface{}{ + "hello": "world", + }, + } + + actionStore, err := newActionStore(log, storage.NewDiskStore(actionStorePath)) + require.NoError(t, err) + + require.Equal(t, 0, len(actionStore.Actions())) + actionStore.Add(ActionPolicyChange) + err = actionStore.Save() + require.NoError(t, err) + require.Equal(t, 1, len(actionStore.Actions())) + + withFile(func(t *testing.T, stateStorePath string) { + err = migrateStateStore(log, actionStorePath, stateStorePath) + require.NoError(t, err) + + stateStore, err := newStateStore(log, storage.NewDiskStore(stateStorePath)) + require.NoError(t, err) + stateStore.SetAckToken(ackToken) + diff := cmp.Diff(actionStore.Actions(), stateStore.Actions()) + if diff != "" { + t.Error(diff) + } + require.Equal(t, ackToken, stateStore.AckToken()) + }) + })) + +} diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_retryable.go b/x-pack/elastic-agent/pkg/agent/operation/operation_retryable.go index f79eca617f82..6376492c2f29 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_retryable.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_retryable.go @@ -82,7 +82,7 @@ func (o *retryableOperations) runOnce(application Application) func(context.Cont o.logger.Debugf("running operation '%s' of the block '%s'", op.Name(), o.Name()) if err := op.Run(ctx, application); err != nil { - o.logger.Errorf("operation %s failed", op.Name()) + o.logger.Errorf("operation %s failed, err: %v", op.Name(), err) return err } } diff --git a/x-pack/elastic-agent/pkg/agent/storage/storage.go b/x-pack/elastic-agent/pkg/agent/storage/storage.go index 2435311486a3..2ff2d7250f13 100644 --- a/x-pack/elastic-agent/pkg/agent/storage/storage.go +++ b/x-pack/elastic-agent/pkg/agent/storage/storage.go @@ -153,6 +153,23 @@ func NewDiskStore(target string) *DiskStore { return &DiskStore{target: target} } +// Exists check if the store file exists on the disk +func (d *DiskStore) Exists() (bool, error) { + _, err := os.Stat(d.target) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + return true, nil +} + +// Delete deletes the store file on the disk +func (d *DiskStore) Delete() error { + return os.Remove(d.target) +} + // Save accepts a persistedConfig and saved it to a target file, to do so we will // make a temporary files if the write is successful we are replacing the target file with the // original content. diff --git a/x-pack/elastic-agent/pkg/fleetapi/action.go b/x-pack/elastic-agent/pkg/fleetapi/action.go index 211b9199f2f9..d836aa801c23 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/action.go +++ b/x-pack/elastic-agent/pkg/fleetapi/action.go @@ -21,6 +21,8 @@ const ( ActionTypePolicyChange = "POLICY_CHANGE" // ActionTypeSettings specifies change of agent settings. ActionTypeSettings = "SETTINGS" + // ActionTypeApplication specifies agent action. + ActionTypeApplication = "APP_ACTION" ) // Action base interface for all the implemented action from the fleet API. @@ -154,6 +156,16 @@ type ActionSettings struct { LogLevel string `json:"log_level"` } +// ID returns the ID of the Action. +func (a *ActionSettings) ID() string { + return a.ActionID +} + +// Type returns the type of the Action. +func (a *ActionSettings) Type() string { + return a.ActionType +} + func (a *ActionSettings) String() string { var s strings.Builder s.WriteString("action_id: ") @@ -165,25 +177,45 @@ func (a *ActionSettings) String() string { return s.String() } -// Type returns the type of the Action. -func (a *ActionSettings) Type() string { - return a.ActionType +// ActionApp is the application action request. +type ActionApp struct { + ActionID string + ActionType string + Application string + Data json.RawMessage +} + +func (a *ActionApp) String() string { + var s strings.Builder + s.WriteString("action_id: ") + s.WriteString(a.ActionID) + s.WriteString(", type: ") + s.WriteString(a.ActionType) + s.WriteString(", application: ") + s.WriteString(a.Application) + return s.String() } // ID returns the ID of the Action. -func (a *ActionSettings) ID() string { +func (a *ActionApp) ID() string { return a.ActionID } +// Type returns the type of the Action. +func (a *ActionApp) Type() string { + return a.ActionType +} + // Actions is a list of Actions to executes and allow to unmarshal heterogenous action type. type Actions []Action // UnmarshalJSON takes every raw representation of an action and try to decode them. func (a *Actions) UnmarshalJSON(data []byte) error { type r struct { - ActionType string `json:"type"` - ActionID string `json:"id"` - Data json.RawMessage `json:"data"` + ActionType string `json:"type"` + Application string `json:"application"` + ActionID string `json:"id"` + Data json.RawMessage `json:"data"` } var responses []r @@ -209,6 +241,13 @@ func (a *Actions) UnmarshalJSON(data []byte) error { "fail to decode POLICY_CHANGE action", errors.TypeConfig) } + case ActionTypeApplication: + action = &ActionApp{ + ActionID: response.ActionID, + ActionType: response.ActionType, + Application: response.Application, + Data: response.Data, + } case ActionTypeUnenroll: action = &ActionUnenroll{ ActionID: response.ActionID, diff --git a/x-pack/elastic-agent/pkg/fleetapi/checkin_cmd.go b/x-pack/elastic-agent/pkg/fleetapi/checkin_cmd.go index 19c936ed79c5..79bcb39d40b0 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/checkin_cmd.go +++ b/x-pack/elastic-agent/pkg/fleetapi/checkin_cmd.go @@ -22,6 +22,7 @@ const checkingPath = "/api/fleet/agents/%s/checkin" // CheckinRequest consists of multiple events reported to fleet ui. type CheckinRequest struct { Status string `json:"status"` + AckToken string `json:"ack_token,omitempty"` Events []SerializableEvent `json:"events"` Metadata *info.ECSMeta `json:"local_metadata,omitempty"` } @@ -49,7 +50,8 @@ func (e *CheckinRequest) Validate() error { // CheckinResponse is the response send back from the server which contains all the action that // need to be executed or proxy to running processes. type CheckinResponse struct { - Actions Actions `json:"actions"` + AckToken string `json:"ack_token"` + Actions Actions `json:"actions"` } // Validate validates the response send from the server.