Skip to content

Commit

Permalink
[Ingest Manager] Agent unenroll (#19507) (#19696)
Browse files Browse the repository at this point in the history
  • Loading branch information
michalpristas authored Jul 8, 2020
1 parent 8f5355d commit 2223653
Show file tree
Hide file tree
Showing 20 changed files with 213 additions and 25 deletions.
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ github.com/aerospike/aerospike-client-go v1.27.1-0.20170612174108-0f3b54da6bdc/g
github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 h1:7rj9qZ63knnVo2ZeepYHvHuRdG76f3tRUTdIQDzRBeI=
github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20/go.mod h1:cI59GRkC2FRaFYtgbYEqMlgnnfvAwXzjojyZKXwklNg=
Expand Down Expand Up @@ -880,6 +882,7 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
43 changes: 33 additions & 10 deletions x-pack/elastic-agent/pkg/agent/application/action_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"
"fmt"
"io"

yaml "gopkg.in/yaml.v2"

Expand Down Expand Up @@ -53,7 +54,7 @@ func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) {
// any other type of action will be silently ignored.
func (s *actionStore) Add(a action) {
switch v := a.(type) {
case *fleetapi.ActionConfigChange:
case *fleetapi.ActionConfigChange, *fleetapi.ActionUnenroll:
// Only persist the action if the action is different.
if s.action != nil && s.action.ID() == v.ID() {
return
Expand All @@ -69,16 +70,29 @@ func (s *actionStore) Save() error {
return nil
}

apc, ok := s.action.(*fleetapi.ActionConfigChange)
if !ok {
return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.action)
}
var reader io.Reader
if apc, ok := s.action.(*fleetapi.ActionConfigChange); ok {
serialize := actionConfigChangeSerializer(*apc)

serialize := actionConfigChangeSerializer(*apc)
r, err := yamlToReader(&serialize)
if err != nil {
return err
}

reader, err := yamlToReader(&serialize)
if err != nil {
return err
reader = r
} else if aun, ok := s.action.(*fleetapi.ActionUnenroll); ok {
serialize := actionUnenrollSerializer(*aun)

r, err := yamlToReader(&serialize)
if err != nil {
return err
}

reader = r
}

if reader == nil {
return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.action)
}

if err := s.store.Save(reader); err != nil {
Expand All @@ -98,7 +112,7 @@ func (s *actionStore) Actions() []action {
return []action{s.action}
}

// actionConfigChangeSerializer is a struct that add YAML serialization, I don't think serialization
// actionConfigChangeSerializer is a struct that adds a YAML serialization, I don't think serialization
// is a concern of the fleetapi package. I went this route so I don't have to do much refactoring.
//
// There are four ways to achieve the same results:
Expand All @@ -117,6 +131,15 @@ type actionConfigChangeSerializer struct {
// Add a guards between the serializer structs and the original struct.
var _ actionConfigChangeSerializer = actionConfigChangeSerializer(fleetapi.ActionConfigChange{})

// actionUnenrollSerializer is a struct that adds a YAML serialization,
type actionUnenrollSerializer struct {
ActionID string `yaml:"action_id"`
ActionType string `yaml:"action_type"`
}

// Add a guards between the serializer structs and the original struct.
var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{})

// actionStoreAcker wraps an existing acker and will send any acked event to the action store,
// its up to the action store to decide if we need to persist the event for future replay or just
// discard the event.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"context"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

// After running Unenroll agent is in idle state, non managed non standalone.
// For it to be operational again it needs to be either enrolled or reconfigured.
type handlerUnenroll struct {
log *logger.Logger
emitter emitterFunc
dispatcher programsDispatcher
closers []context.CancelFunc
}

func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error {
h.log.Debugf("handlerUnenroll: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionUnenroll)
if !ok {
return fmt.Errorf("invalid type, expected ActionUnenroll and received %T", a)
}

// Providing empty map will close all pipelines
noPrograms := make(map[routingKey][]program.Program)
h.dispatcher.Dispatch(a.ID(), noPrograms)

if err := acker.Ack(ctx, action); err != nil {
return err
}

// commit all acks before quitting.
if err := acker.Commit(ctx); err != nil {
return err
}

// close fleet gateway loop
for _, c := range h.closers {
c()
}

// clean action store
// if err := os.Remove(info.AgentActionStoreFile()); err != nil && !os.IsNotExist(err) {
// return errors.New(err, "failed to clear action store")
// }

return nil
}
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type emitterFunc func(*config.Config) error
// ConfigHandler is capable of handling config, perform actions at it, shutdown any long running process.
type ConfigHandler interface {
HandleConfig(configrequest.Request) error
Close() error
Shutdown()
}

Expand Down
31 changes: 30 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Managed struct {
gateway *fleetGateway
router *router
srv *server.Server
as *actionStore
}

func newManaged(
Expand Down Expand Up @@ -168,6 +169,7 @@ func newManaged(
if err != nil {
return nil, errors.New(err, fmt.Sprintf("fail to read action store '%s'", info.AgentActionStoreFile()))
}
managedApplication.as = actionStore
actionAcker := newActionStoreAcker(batchedAcker, actionStore)

actionDispatcher, err := newActionDispatcher(managedApplication.bgContext, log, &handlerDefault{log: log})
Expand All @@ -183,13 +185,24 @@ func newManaged(
},
)

actionDispatcher.MustRegister(
&fleetapi.ActionUnenroll{},
&handlerUnenroll{
log: log,
emitter: emit,
dispatcher: router,
closers: []context.CancelFunc{managedApplication.cancelCtxFn},
},
)

actionDispatcher.MustRegister(
&fleetapi.ActionUnknown{},
&handlerUnknown{log: log},
)

actions := actionStore.Actions()
if len(actions) > 0 {

if len(actions) > 0 && !managedApplication.wasUnenrolled() {
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
Expand Down Expand Up @@ -219,6 +232,11 @@ func newManaged(
// Start starts a managed elastic-agent.
func (m *Managed) Start() error {
m.log.Info("Agent is starting")
if m.wasUnenrolled() {
m.log.Warnf("agent was previously unenrolled. To reactivate please reconfigure or enroll again.")
return nil
}

m.gateway.Start()
return nil
}
Expand All @@ -236,3 +254,14 @@ func (m *Managed) Stop() error {
func (m *Managed) AgentInfo() *info.AgentInfo {
return m.agentInfo
}

func (m *Managed) wasUnenrolled() bool {
actions := m.as.Actions()
for _, a := range actions {
if a.Type() == "UNENROLL" {
return true
}
}

return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)
Expand Down Expand Up @@ -68,16 +69,16 @@ func testActions() ([]action, error) {
}

type mockStreamStore struct {
store []*configRequest
store []configrequest.Request
}

func newMockStreamStore() *mockStreamStore {
return &mockStreamStore{
store: make([]*configRequest, 0),
store: make([]configrequest.Request, 0),
}
}

func (m *mockStreamStore) Execute(cr *configRequest) error {
func (m *mockStreamStore) Execute(cr configrequest.Request) error {
m.store = append(m.store, cr)
return nil
}
Expand Down
9 changes: 4 additions & 5 deletions x-pack/elastic-agent/pkg/agent/application/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package application
import (
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted"
Expand All @@ -19,7 +21,7 @@ var defautlRK = "DEFAULT"
type routingKey = string

type stream interface {
Execute(*configRequest) error
Execute(configrequest.Request) error
Close() error
Shutdown()
}
Expand Down Expand Up @@ -73,10 +75,7 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
return fmt.Errorf("could not find programs for routing key %s", rk)
}

req := &configRequest{
id: id,
programs: programs.([]program.Program),
}
req := configrequest.New(id, time.Now(), programs.([]program.Program))

r.log.Debugf(
"Streams %s need to run config with ID %s and programs: %s",
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)
Expand Down Expand Up @@ -199,7 +200,7 @@ func newMockStream(rk routingKey, notify notifyFunc) *mockStream {
}
}

func (m *mockStream) Execute(req *configRequest) error {
func (m *mockStream) Execute(req configrequest.Request) error {
m.event(executeOp, req)
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
Expand All @@ -28,10 +29,10 @@ type operatorStream struct {
}

func (b *operatorStream) Close() error {
return b.configHandler.HandleConfig(&configRequest{})
return b.configHandler.Close()
}

func (b *operatorStream) Execute(cfg *configRequest) error {
func (b *operatorStream) Execute(cfg configrequest.Request) error {
return b.configHandler.HandleConfig(cfg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package configrequest

import (
"strings"
Expand All @@ -19,6 +19,15 @@ type configRequest struct {
programs []program.Program
}

// New created a new Request.
func New(id string, createdAt time.Time, programs []program.Program) Request {
return &configRequest{
id: id,
createdAt: createdAt,
programs: programs,
}
}

func (c *configRequest) String() string {
names := c.ProgramNames()
return "[" + c.ShortID() + "] Config: " + strings.Join(names, ", ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application
package configrequest

import (
"testing"
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/agent/configrequest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
// Request is the minimal interface a config request must have.
type Request interface {
ID() string
ShortID() string
CreatedAt() time.Time
Programs() []program.Program
ProgramNames() []string
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
}

func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) {
for _, step := range o.getMonitoringSteps(s) {
for _, step := range o.generateMonitoringSteps(s.Version, nil) {
p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags())
if err != nil {
return errors.New(err,
Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ func (b *testMonitor) EnrichArgs(_ string, _ string, args []string, _ bool) []st
// Cleanup cleans up all drops.
func (b *testMonitor) Cleanup(string, string) error { return nil }

// Close closes the monitor.
func (b *testMonitor) Close() {}

// Prepare executes steps in order for monitoring to work correctly
func (b *testMonitor) Prepare(string, string, int, int) error { return nil }

Expand Down
Loading

0 comments on commit 2223653

Please sign in to comment.