-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Elastic Agent] Reload fleet.kibana.hosts from policy change #21599
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,17 +5,36 @@ | |
package application | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"io" | ||
"sort" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" | ||
|
||
"gopkg.in/yaml.v2" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/kibana" | ||
) | ||
|
||
type clientSetter interface { | ||
SetClient(clienter) | ||
} | ||
|
||
type handlerPolicyChange struct { | ||
log *logger.Logger | ||
emitter emitterFunc | ||
log *logger.Logger | ||
emitter emitterFunc | ||
agentInfo *info.AgentInfo | ||
config *configuration.Configuration | ||
store storage.Store | ||
setters []clientSetter | ||
} | ||
|
||
func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetAcker) error { | ||
|
@@ -31,9 +50,81 @@ func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetA | |
} | ||
|
||
h.log.Debugf("handlerPolicyChange: emit configuration for action %+v", a) | ||
err = h.handleKibanaHosts(c) | ||
if err != nil { | ||
return err | ||
} | ||
if err := h.emitter(c); err != nil { | ||
return err | ||
} | ||
|
||
return acker.Ack(ctx, action) | ||
} | ||
|
||
func (h *handlerPolicyChange) handleKibanaHosts(c *config.Config) error { | ||
cfg, err := configuration.NewFromConfig(c) | ||
if err != nil { | ||
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) | ||
} | ||
if kibanaEqual(h.config.Fleet.Kibana, cfg.Fleet.Kibana) { | ||
// already the same hosts | ||
return nil | ||
} | ||
// only set protocol/hosts as that is all Fleet currently sends | ||
h.config.Fleet.Kibana.Protocol = cfg.Fleet.Kibana.Protocol | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in case client creation fails we will end up with updated values in memory which does not correspond to currently running client, we should probably do some rollback on error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call, I have added in rollback of the previous values on failure. |
||
h.config.Fleet.Kibana.Hosts = cfg.Fleet.Kibana.Hosts | ||
client, err := fleetapi.NewAuthWithConfig(h.log, h.config.Fleet.AccessAPIKey, h.config.Fleet.Kibana) | ||
if err != nil { | ||
return errors.New( | ||
err, "fail to create API client with updated hosts", | ||
errors.TypeNetwork, errors.M("hosts", h.config.Fleet.Kibana.Hosts)) | ||
} | ||
reader, err := fleetToReader(h.agentInfo, h.config) | ||
if err != nil { | ||
return errors.New( | ||
err, "fail to persist updated API client hosts", | ||
errors.TypeUnexpected, errors.M("hosts", h.config.Fleet.Kibana.Hosts)) | ||
} | ||
err = h.store.Save(reader) | ||
if err != nil { | ||
return errors.New( | ||
err, "fail to persist updated API client hosts", | ||
errors.TypeFilesystem, errors.M("hosts", h.config.Fleet.Kibana.Hosts)) | ||
} | ||
for _, setter := range h.setters { | ||
setter.SetClient(client) | ||
} | ||
return nil | ||
} | ||
|
||
func kibanaEqual(k1 *kibana.Config, k2 *kibana.Config) bool { | ||
if k1.Protocol != k2.Protocol { | ||
return false | ||
} | ||
|
||
sort.Strings(k1.Hosts) | ||
sort.Strings(k2.Hosts) | ||
if len(k1.Hosts) != len(k2.Hosts) { | ||
return false | ||
} | ||
for i, v := range k1.Hosts { | ||
if v != k2.Hosts[i] { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
|
||
func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration) (io.Reader, error) { | ||
configToStore := map[string]interface{}{ | ||
"fleet": cfg.Fleet, | ||
"agent": map[string]interface{}{ | ||
"id": agentInfo.AgentID(), | ||
}, | ||
} | ||
data, err := yaml.Marshal(configToStore) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return bytes.NewReader(data), nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should here and in acker make sure client is not change while performing execute/ack action.
at least it can be misleading e.g
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think this flow is correct, because each action from Fleet is handled synchronously in the Agent. So when a policy change comes in with an updated hosts, they action will not be Ack'd until this code is able to re-connect back to Kibana using the new hosts information.
fleetapi.NewAuthWithConfig
ensures that the created client can communicate with Kibana, so that means the Ack will not happen until the updated client is created and set.