Skip to content

Commit

Permalink
[Elastic Agent] Reload fleet.kibana.hosts from policy change (elastic…
Browse files Browse the repository at this point in the history
…#21599)

* Update the connected client for kibana from policy change.

* Fix vet.

* Add changelog.

* Add protocol compare.

* Rollback protocol and hosts on failure.
  • Loading branch information
blakerouse authored Oct 7, 2020
1 parent 7028e2c commit 1891717
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 15 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@
- Send updating state {pull}21461[21461]
- Add `elastic.agent.id` and `elastic.agent.version` to published events from filebeat and metricbeat {pull}21543[21543]
- Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425]
- Update `fleet.yml` and Kibana hosts when a policy change updates the Kibana hosts {pull}21599[21599]
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func newActionAcker(
}, nil
}

func (f *actionAcker) SetClient(client clienter) {
f.client = client
}

func (f *actionAcker) Ack(ctx context.Context, action fleetapi.Action) error {
// checkin
agentID := f.agentInfo.AgentID()
Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,7 @@ func (f *fleetGateway) stop() {
close(f.done)
f.wg.Wait()
}

func (f *fleetGateway) SetClient(client clienter) {
f.client = client
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,36 @@
package application

import (
"bytes"
"context"
"fmt"
"io"
"sort"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"

"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/kibana"
)

type clientSetter interface {
SetClient(clienter)
}

type handlerPolicyChange struct {
log *logger.Logger
emitter emitterFunc
log *logger.Logger
emitter emitterFunc
agentInfo *info.AgentInfo
config *configuration.Configuration
store storage.Store
setters []clientSetter
}

func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetAcker) error {
Expand All @@ -31,9 +50,93 @@ func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetA
}

h.log.Debugf("handlerPolicyChange: emit configuration for action %+v", a)
err = h.handleKibanaHosts(c)
if err != nil {
return err
}
if err := h.emitter(c); err != nil {
return err
}

return acker.Ack(ctx, action)
}

func (h *handlerPolicyChange) handleKibanaHosts(c *config.Config) (err error) {
cfg, err := configuration.NewFromConfig(c)
if err != nil {
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
}
if kibanaEqual(h.config.Fleet.Kibana, cfg.Fleet.Kibana) {
// already the same hosts
return nil
}

// only set protocol/hosts as that is all Fleet currently sends
prevProtocol := h.config.Fleet.Kibana.Protocol
prevHosts := h.config.Fleet.Kibana.Hosts
h.config.Fleet.Kibana.Protocol = cfg.Fleet.Kibana.Protocol
h.config.Fleet.Kibana.Hosts = cfg.Fleet.Kibana.Hosts

// rollback on failure
defer func() {
if err != nil {
h.config.Fleet.Kibana.Protocol = prevProtocol
h.config.Fleet.Kibana.Hosts = prevHosts
}
}()

client, err := fleetapi.NewAuthWithConfig(h.log, h.config.Fleet.AccessAPIKey, h.config.Fleet.Kibana)
if err != nil {
return errors.New(
err, "fail to create API client with updated hosts",
errors.TypeNetwork, errors.M("hosts", h.config.Fleet.Kibana.Hosts))
}
reader, err := fleetToReader(h.agentInfo, h.config)
if err != nil {
return errors.New(
err, "fail to persist updated API client hosts",
errors.TypeUnexpected, errors.M("hosts", h.config.Fleet.Kibana.Hosts))
}
err = h.store.Save(reader)
if err != nil {
return errors.New(
err, "fail to persist updated API client hosts",
errors.TypeFilesystem, errors.M("hosts", h.config.Fleet.Kibana.Hosts))
}
for _, setter := range h.setters {
setter.SetClient(client)
}
return nil
}

func kibanaEqual(k1 *kibana.Config, k2 *kibana.Config) bool {
if k1.Protocol != k2.Protocol {
return false
}

sort.Strings(k1.Hosts)
sort.Strings(k2.Hosts)
if len(k1.Hosts) != len(k2.Hosts) {
return false
}
for i, v := range k1.Hosts {
if v != k2.Hosts[i] {
return false
}
}
return true
}

func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration) (io.Reader, error) {
configToStore := map[string]interface{}{
"fleet": cfg.Fleet,
"agent": map[string]interface{}{
"id": agentInfo.AgentID(),
},
}
data, err := yaml.Marshal(configToStore)
if err != nil {
return nil, err
}
return bytes.NewReader(data), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"sync"
"testing"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -31,6 +35,8 @@ func (m *mockEmitter) Emitter(policy *config.Config) error {
func TestPolicyChange(t *testing.T) {
log, _ := logger.New("")
ack := newNoopAcker()
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}

t.Run("Receive a config change and successfully emits a raw configuration", func(t *testing.T) {
emitter := &mockEmitter{}
Expand All @@ -42,7 +48,14 @@ func TestPolicyChange(t *testing.T) {
Policy: conf,
}

handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter}
cfg := configuration.DefaultConfiguration()
handler := &handlerPolicyChange{
log: log,
emitter: emitter.Emitter,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
}

err := handler.Handle(context.Background(), action, ack)
require.NoError(t, err)
Expand All @@ -60,7 +73,14 @@ func TestPolicyChange(t *testing.T) {
Policy: conf,
}

handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter}
cfg := configuration.DefaultConfiguration()
handler := &handlerPolicyChange{
log: log,
emitter: emitter.Emitter,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
}

err := handler.Handle(context.Background(), action, ack)
require.Error(t, err)
Expand All @@ -69,6 +89,9 @@ func TestPolicyChange(t *testing.T) {

func TestPolicyAcked(t *testing.T) {
log, _ := logger.New("")
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}

t.Run("Config change should not ACK on error", func(t *testing.T) {
tacker := &testAcker{}

Expand All @@ -83,7 +106,14 @@ func TestPolicyAcked(t *testing.T) {
Policy: config,
}

handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter}
cfg := configuration.DefaultConfiguration()
handler := &handlerPolicyChange{
log: log,
emitter: emitter.Emitter,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
}

err := handler.Handle(context.Background(), action, tacker)
require.Error(t, err)
Expand All @@ -105,7 +135,14 @@ func TestPolicyAcked(t *testing.T) {
Policy: config,
}

handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter}
cfg := configuration.DefaultConfiguration()
handler := &handlerPolicyChange{
log: log,
emitter: emitter.Emitter,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
}

err := handler.Handle(context.Background(), action, tacker)
require.NoError(t, err)
Expand Down
16 changes: 12 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,17 @@ func newManaged(
acker,
combinedReporter)

policyChanger := &handlerPolicyChange{
log: log,
emitter: emit,
agentInfo: agentInfo,
config: cfg,
store: store,
setters: []clientSetter{acker},
}
actionDispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
&handlerPolicyChange{
log: log,
emitter: emit,
},
policyChanger,
)

actionDispatcher.MustRegister(
Expand Down Expand Up @@ -264,6 +269,9 @@ func newManaged(
if err != nil {
return nil, err
}
// add the gateway to setters, so the gateway can be updated
// when the hosts for Kibana are updated by the policy.
policyChanger.setters = append(policyChanger.setters, gateway)

managedApplication.gateway = gateway
return managedApplication, nil
Expand Down
12 changes: 10 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"encoding/json"
"testing"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
Expand All @@ -34,18 +37,23 @@ func TestManagedModeRouting(t *testing.T) {
log, _ := logger.New("")
router, _ := newRouter(log, streamFn)
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}
composableCtrl, _ := composable.New(log, nil)
emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}})
require.NoError(t, err)

actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log})
require.NoError(t, err)

cfg := configuration.DefaultConfiguration()
actionDispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
&handlerPolicyChange{
log: log,
emitter: emit,
log: log,
emitter: emit,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
},
)

Expand Down
8 changes: 5 additions & 3 deletions x-pack/elastic-agent/pkg/agent/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

const perms os.FileMode = 0600

type store interface {
// Store saves the io.Reader.
type Store interface {
// Save the io.Reader.
Save(io.Reader) error
}

Expand Down Expand Up @@ -62,12 +64,12 @@ type ReplaceOnSuccessStore struct {
target string
replaceWith []byte

wrapped store
wrapped Store
}

// NewReplaceOnSuccessStore takes a target file and a replacement content and will replace the target
// file content if the wrapped store execution is done without any error.
func NewReplaceOnSuccessStore(target string, replaceWith []byte, wrapped store) *ReplaceOnSuccessStore {
func NewReplaceOnSuccessStore(target string, replaceWith []byte, wrapped Store) *ReplaceOnSuccessStore {
return &ReplaceOnSuccessStore{
target: target,
replaceWith: replaceWith,
Expand Down

0 comments on commit 1891717

Please sign in to comment.