Skip to content

Commit

Permalink
[Ingest Management] Agent supports capabilities definition (elastic#2…
Browse files Browse the repository at this point in the history
…3848) (elastic#24037)

[Ingest Management] Agent supports capabilities definition (elastic#23848)
  • Loading branch information
michalpristas authored Feb 15, 2021
1 parent 6db5806 commit 86a0bd8
Show file tree
Hide file tree
Showing 38 changed files with 2,847 additions and 10 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
- Add --staging option to enroll command {pull}20026[20026]
- Add `event.dataset` to all events {pull}20076[20076]
- Send datastreams fields {pull}20416[20416]
- Agent supports capabilities definition {pull}23848[23848]

[[release-notes-7.8.0]]
=== Elastic Agent version 7.8.0
Expand Down
19 changes: 18 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -42,6 +43,7 @@ type emitterController struct {
router programsDispatcher
modifiers *configModifiers
reloadables []reloadable
caps capabilities.Capability

// state
lock sync.RWMutex
Expand All @@ -65,6 +67,20 @@ func (e *emitterController) Update(c *config.Config) error {
if err != nil {
return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig)
}

if e.caps != nil {
var ok bool
updatedAst, err := e.caps.Apply(rawAst)
if err != nil {
return errors.New(err, "failed to apply capabilities")
}

rawAst, ok = updatedAst.(*transpiler.AST)
if !ok {
return errors.New("failed to transform object returned from capabilities to AST", errors.TypeConfig)
}
}

for _, filter := range e.modifiers.Filters {
if err := filter(e.logger, rawAst); err != nil {
return errors.New(err, "failed to filter configuration", errors.TypeConfig)
Expand Down Expand Up @@ -142,7 +158,7 @@ func (e *emitterController) update() error {
return e.router.Dispatch(ast.HashStr(), programsToRun)
}

func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) (emitterFunc, error) {
func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, caps capabilities.Capability, reloadables ...reloadable) (emitterFunc, error) {
log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", "))

init, _ := transpiler.NewVars(map[string]interface{}{})
Expand All @@ -154,6 +170,7 @@ func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo,
modifiers: modifiers,
reloadables: reloadables,
vars: []*transpiler.Vars{init},
caps: caps,
}
err := controller.Run(ctx, func(vars []*transpiler.Vars) {
ctrl.Set(vars)
Expand Down
6 changes: 6 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/info/agent_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

// defaultAgentConfigFile is a name of file used to store agent information
const defaultAgentCapabilitiesFile = "capabilities.yml"
const defaultAgentConfigFile = "fleet.yml"
const agentInfoKey = "agent"

Expand All @@ -44,6 +45,11 @@ func AgentConfigFile() string {
return filepath.Join(paths.Config(), defaultAgentConfigFile)
}

// AgentCapabilitiesPath is a name of file used to store agent capabilities
func AgentCapabilitiesPath() string {
return filepath.Join(paths.Config(), defaultAgentCapabilitiesFile)
}

// AgentActionStoreFile is the file that contains the action that can be replayed after restart.
func AgentActionStoreFile() string {
return filepath.Join(paths.Home(), defaultAgentActionStoreFile)
Expand Down
22 changes: 21 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

Expand Down Expand Up @@ -118,7 +120,25 @@ func loadFleetConfig(cfg *config.Config) (map[string]interface{}, error) {
}

func printMapStringConfig(mapStr map[string]interface{}) error {
data, err := yaml.Marshal(mapStr)
l, err := newErrorLogger()
if err != nil {
return err
}
caps, err := capabilities.Load(info.AgentCapabilitiesPath(), l, status.NewController(l))
if err != nil {
return err
}

newCfg, err := caps.Apply(mapStr)
if err != nil {
return errors.New(err, "failed to apply capabilities")
}
newMap, ok := newCfg.(map[string]interface{})
if !ok {
return errors.New("config returned from capabilities has invalid type")
}

data, err := yaml.Marshal(newMap)
if err != nil {
return errors.New(err, "could not marshal to YAML")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/noop"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
)

// InspectOutputCmd is an inspect subcommand that shows configurations of the agent.
Expand Down Expand Up @@ -207,13 +209,19 @@ func getProgramsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *c
modifiers.Filters = append(modifiers.Filters, injectFleet(cfg, sysInfo.Info(), agentInfo))
}

caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, status.NewController(log))
if err != nil {
return nil, err
}

emit, err := emitter(
ctx,
log,
agentInfo,
composableWaiter,
router,
modifiers,
caps,
monitor,
)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -66,6 +67,11 @@ func newLocal(
agentInfo *info.AgentInfo,
) (*Local, error) {
statusController := &noopController{}
caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusController)
if err != nil {
return nil, err
}

cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -120,6 +126,7 @@ func newLocal(
Decorators: []decoratorFunc{injectMonitoring},
Filters: []filterFunc{filters.StreamChecker},
},
caps,
monitor,
)
if err != nil {
Expand All @@ -145,7 +152,8 @@ func newLocal(
[]context.CancelFunc{localApplication.cancelCtxFn},
reexec,
newNoopAcker(),
reporter)
reporter,
caps)
uc.SetUpgrader(upgrader)

return localApplication, nil
Expand Down
10 changes: 9 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -70,6 +71,11 @@ func newManaged(
agentInfo *info.AgentInfo,
) (*Managed, error) {
statusController := status.NewController(log)
caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusController)
if err != nil {
return nil, err
}

path := info.AgentConfigFile()

store := storage.NewDiskStore(path)
Expand Down Expand Up @@ -177,6 +183,7 @@ func newManaged(
Decorators: []decoratorFunc{injectMonitoring},
Filters: []filterFunc{filters.StreamChecker, injectFleet(config, sysInfo.Info(), agentInfo)},
},
caps,
monitor,
)
if err != nil {
Expand Down Expand Up @@ -209,7 +216,8 @@ func newManaged(
[]context.CancelFunc{managedApplication.cancelCtxFn},
reexec,
acker,
combinedReporter)
combinedReporter,
caps)

policyChanger := &handlerPolicyChange{
log: log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestManagedModeRouting(t *testing.T) {
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}
composableCtrl, _ := composable.New(log, nil)
emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}})
emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}}, nil)
require.NoError(t, err)

actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
type noopController struct{}

func (*noopController) Register(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }
func (*noopController) RegisterWithPersistance(_ string, _ bool) status.Reporter {
return &noopReporter{}
}
func (*noopController) Status() status.AgentStatus { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }

type noopReporter struct{}

Expand Down
11 changes: 10 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/capabilities"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Upgrader struct {
acker acker
reporter stateReporter
upgradeable bool
caps capabilities.Capability
}

// Action is the upgrade action state.
Expand Down Expand Up @@ -81,7 +83,7 @@ func IsUpgradeable() bool {
}

// NewUpgrader creates an upgrader which is capable of performing upgrade operation
func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader {
func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter, caps capabilities.Capability) *Upgrader {
return &Upgrader{
agentInfo: agentInfo,
settings: settings,
Expand All @@ -91,6 +93,7 @@ func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logg
acker: a,
reporter: r,
upgradeable: IsUpgradeable(),
caps: caps,
}
}

Expand All @@ -116,6 +119,12 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (err e
"running under control of the systems supervisor")
}

if u.caps != nil {
if _, err := u.caps.Apply(a); err == capabilities.ErrBlocked {
return nil
}
}

u.reportUpdating(a.Version())

sourceURI, err := u.sourceURI(a.Version(), a.SourceURI())
Expand Down
97 changes: 97 additions & 0 deletions x-pack/elastic-agent/pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package capabilities

import (
"errors"
"os"

"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
)

// Capability provides a way of applying predefined filter to object.
// It's up to capability to determine if capability is applicable on object.
type Capability interface {
// Apply applies capabilities on input and returns true if input should be completely blocked
// otherwise, false and updated input is returned
Apply(interface{}) (interface{}, error)
}

var (
// ErrBlocked is returned when capability is blocking.
ErrBlocked = errors.New("capability blocked")
)

type capabilitiesManager struct {
caps []Capability
reporter status.Reporter
}

type capabilityFactory func(*logger.Logger, *ruleDefinitions, status.Reporter) (Capability, error)

// Load loads capabilities files and prepares manager.
func Load(capsFile string, log *logger.Logger, sc status.Controller) (Capability, error) {
handlers := []capabilityFactory{
newInputsCapability,
newOutputsCapability,
newUpgradesCapability,
}

cm := &capabilitiesManager{
caps: make([]Capability, 0),
reporter: sc.RegisterWithPersistance("capabilities", true),
}

// load capabilities from file
fd, err := os.Open(capsFile)
if err != nil && !os.IsNotExist(err) {
return cm, err
}

if os.IsNotExist(err) {
log.Infof("capabilities file not found in %s", capsFile)
return cm, nil
}
defer fd.Close()

definitions := &ruleDefinitions{Capabilities: make([]ruler, 0)}
dec := yaml.NewDecoder(fd)
if err := dec.Decode(&definitions); err != nil {
return cm, err
}

// make list of handlers out of capabilities definition
for _, h := range handlers {
cap, err := h(log, definitions, cm.reporter)
if err != nil {
return nil, err
}

if cap == nil {
continue
}

cm.caps = append(cm.caps, cap)
}

return cm, nil
}

func (mgr *capabilitiesManager) Apply(in interface{}) (interface{}, error) {
var err error
// reset health on start, child caps will update to fail if needed
mgr.reporter.Update(status.Healthy)
for _, cap := range mgr.caps {
in, err = cap.Apply(in)
if err != nil {
return in, err
}
}

return in, nil
}
Loading

0 comments on commit 86a0bd8

Please sign in to comment.