Skip to content

Commit

Permalink
[Elastic Agent] Add elastic agent ID and version to events from fileb…
Browse files Browse the repository at this point in the history
…eat and metricbeat. (#21543)

* Add elastic agent ID and version to events from filebeat and metricbeat.

* Add changelog and fix inputs.
  • Loading branch information
blakerouse authored Oct 6, 2020
1 parent 3f6c36a commit f5d13aa
Show file tree
Hide file tree
Showing 20 changed files with 301 additions and 69 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
- Add `install` and `uninstall` subcommands {pull}21206[21206]
- Send updating state {pull}21461[21461]
- Add `elastic.agent.id` and `elastic.agent.version` to published events from filebeat and metricbeat {pull}21543[21543]
- Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425]
11 changes: 7 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"sync"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
Expand All @@ -18,7 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error)
type decoratorFunc = func(*info.AgentInfo, string, *transpiler.AST, []program.Program) ([]program.Program, error)
type filterFunc = func(*logger.Logger, *transpiler.AST) error

type reloadable interface {
Expand All @@ -36,6 +37,7 @@ type programsDispatcher interface {

type emitterController struct {
logger *logger.Logger
agentInfo *info.AgentInfo
controller composable.Controller
router programsDispatcher
modifiers *configModifiers
Expand Down Expand Up @@ -112,14 +114,14 @@ func (e *emitterController) update() error {

e.logger.Debug("Converting single configuration into specific programs configuration")

programsToRun, err := program.Programs(ast)
programsToRun, err := program.Programs(e.agentInfo, ast)
if err != nil {
return err
}

for _, decorator := range e.modifiers.Decorators {
for outputType, ptr := range programsToRun {
programsToRun[outputType], err = decorator(outputType, ast, ptr)
programsToRun[outputType], err = decorator(e.agentInfo, outputType, ast, ptr)
if err != nil {
return err
}
Expand All @@ -135,12 +137,13 @@ func (e *emitterController) update() error {
return e.router.Dispatch(ast.HashStr(), programsToRun)
}

func emitter(ctx context.Context, log *logger.Logger, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) (emitterFunc, error) {
func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) (emitterFunc, error) {
log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", "))

init, _ := transpiler.NewVars(map[string]interface{}{})
ctrl := &emitterController{
logger: log,
agentInfo: agentInfo,
controller: controller,
router: router,
modifiers: modifiers,
Expand Down
12 changes: 12 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package info

import "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"

// AgentInfo is a collection of information about agent.
type AgentInfo struct {
agentID string
Expand Down Expand Up @@ -44,3 +46,13 @@ func ForceNewAgentInfo() (*AgentInfo, error) {
func (i *AgentInfo) AgentID() string {
return i.agentID
}

// Version returns the version for this Agent.
func (*AgentInfo) Version() string {
return release.Version()
}

// Snapshot returns if this version is a snapshot.
func (*AgentInfo) Snapshot() bool {
return release.Snapshot()
}
45 changes: 26 additions & 19 deletions x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"context"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand All @@ -37,14 +38,19 @@ func NewInspectOutputCmd(configPath, output, program string) (*InspectOutputCmd,

// Execute tries to enroll the agent into Fleet.
func (c *InspectOutputCmd) Execute() error {
agentInfo, err := info.NewAgentInfo()
if err != nil {
return err
}

if c.output == "" {
return c.inspectOutputs()
return c.inspectOutputs(agentInfo)
}

return c.inspectOutput()
return c.inspectOutput(agentInfo)
}

func (c *InspectOutputCmd) inspectOutputs() error {
func (c *InspectOutputCmd) inspectOutputs(agentInfo *info.AgentInfo) error {
rawConfig, err := loadConfig(c.cfgPath)
if err != nil {
return err
Expand All @@ -61,7 +67,7 @@ func (c *InspectOutputCmd) inspectOutputs() error {
}

if isStandalone(cfg.Fleet) {
return listOutputsFromConfig(l, rawConfig)
return listOutputsFromConfig(l, agentInfo, rawConfig)
}

fleetConfig, err := loadFleetConfig(rawConfig)
Expand All @@ -71,11 +77,11 @@ func (c *InspectOutputCmd) inspectOutputs() error {
return fmt.Errorf("no fleet config retrieved yet")
}

return listOutputsFromMap(l, fleetConfig)
return listOutputsFromMap(l, agentInfo, fleetConfig)
}

func listOutputsFromConfig(log *logger.Logger, cfg *config.Config) error {
programsGroup, err := getProgramsFromConfig(log, cfg)
func listOutputsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *config.Config) error {
programsGroup, err := getProgramsFromConfig(log, agentInfo, cfg)
if err != nil {
return err

Expand All @@ -88,16 +94,16 @@ func listOutputsFromConfig(log *logger.Logger, cfg *config.Config) error {
return nil
}

func listOutputsFromMap(log *logger.Logger, cfg map[string]interface{}) error {
func listOutputsFromMap(log *logger.Logger, agentInfo *info.AgentInfo, cfg map[string]interface{}) error {
c, err := config.NewConfigFrom(cfg)
if err != nil {
return err
}

return listOutputsFromConfig(log, c)
return listOutputsFromConfig(log, agentInfo, c)
}

func (c *InspectOutputCmd) inspectOutput() error {
func (c *InspectOutputCmd) inspectOutput(agentInfo *info.AgentInfo) error {
rawConfig, err := loadConfig(c.cfgPath)
if err != nil {
return err
Expand All @@ -114,7 +120,7 @@ func (c *InspectOutputCmd) inspectOutput() error {
}

if isStandalone(cfg.Fleet) {
return printOutputFromConfig(l, c.output, c.program, rawConfig)
return printOutputFromConfig(l, agentInfo, c.output, c.program, rawConfig)
}

fleetConfig, err := loadFleetConfig(rawConfig)
Expand All @@ -124,11 +130,11 @@ func (c *InspectOutputCmd) inspectOutput() error {
return fmt.Errorf("no fleet config retrieved yet")
}

return printOutputFromMap(l, c.output, c.program, fleetConfig)
return printOutputFromMap(l, agentInfo, c.output, c.program, fleetConfig)
}

func printOutputFromConfig(log *logger.Logger, output, programName string, cfg *config.Config) error {
programsGroup, err := getProgramsFromConfig(log, cfg)
func printOutputFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, output, programName string, cfg *config.Config) error {
programsGroup, err := getProgramsFromConfig(log, agentInfo, cfg)
if err != nil {
return err

Expand Down Expand Up @@ -164,16 +170,16 @@ func printOutputFromConfig(log *logger.Logger, output, programName string, cfg *

}

func printOutputFromMap(log *logger.Logger, output, programName string, cfg map[string]interface{}) error {
func printOutputFromMap(log *logger.Logger, agentInfo *info.AgentInfo, output, programName string, cfg map[string]interface{}) error {
c, err := config.NewConfigFrom(cfg)
if err != nil {
return err
}

return printOutputFromConfig(log, output, programName, c)
return printOutputFromConfig(log, agentInfo, output, programName, c)
}

func getProgramsFromConfig(log *logger.Logger, cfg *config.Config) (map[string][]program.Program, error) {
func getProgramsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *config.Config) (map[string][]program.Program, error) {
monitor := noop.NewMonitor()
router := &inmemRouter{}
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -186,6 +192,7 @@ func getProgramsFromConfig(log *logger.Logger, cfg *config.Config) (map[string][
emit, err := emitter(
ctx,
log,
agentInfo,
composableWaiter,
router,
&configModifiers{
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func newLocal(
emit, err := emitter(
localApplication.bgContext,
log,
agentInfo,
composableCtrl,
router,
&configModifiers{
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func newManaged(
emit, err := emitter(
managedApplication.bgContext,
log,
agentInfo,
composableCtrl,
router,
&configModifiers{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand All @@ -32,8 +33,9 @@ func TestManagedModeRouting(t *testing.T) {

log, _ := logger.New("")
router, _ := newRouter(log, streamFn)
agentInfo, _ := info.NewAgentInfo()
composableCtrl, _ := composable.New(log, nil)
emit, err := emitter(ctx, log, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}})
emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}})
require.NoError(t, err)

actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
)
Expand All @@ -28,7 +29,7 @@ const (
defaultOutputName = "default"
)

func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun []program.Program) ([]program.Program, error) {
func injectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *transpiler.AST, programsToRun []program.Program) ([]program.Program, error) {
var err error
monitoringProgram := program.Program{
Spec: program.Spec{
Expand Down Expand Up @@ -63,7 +64,7 @@ func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun
}

ast := rootAst.Clone()
if err := getMonitoringRule(monitoringOutputName).Apply(ast); err != nil {
if err := getMonitoringRule(monitoringOutputName).Apply(agentInfo, ast); err != nil {
return programsToRun, err
}

Expand Down Expand Up @@ -93,6 +94,7 @@ func getMonitoringRule(outputName string) *transpiler.RuleList {
return transpiler.NewRuleList(
transpiler.Copy(monitoringOutputSelector, outputKey),
transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey),
transpiler.InjectAgentInfo(),
transpiler.Filter(monitoringKey, programsKey, outputKey),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,30 @@ package application
import (
"testing"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
)

func TestMonitoringInjection(t *testing.T) {
agentInfo, err := info.NewAgentInfo()
if err != nil {
t.Fatal(err)
}
ast, err := transpiler.NewAST(inputConfigMap)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(ast)
programsToRun, err := program.Programs(agentInfo, ast)
if err != nil {
t.Fatal(err)
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := injectMonitoring(group, ast, ptr)
newPtr, err := injectMonitoring(agentInfo, group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
Expand Down Expand Up @@ -83,20 +88,24 @@ GROUPLOOP:
}

func TestMonitoringInjectionDefaults(t *testing.T) {
agentInfo, err := info.NewAgentInfo()
if err != nil {
t.Fatal(err)
}
ast, err := transpiler.NewAST(inputConfigMapDefaults)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(ast)
programsToRun, err := program.Programs(agentInfo, ast)
if err != nil {
t.Fatal(err)
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := injectMonitoring(group, ast, ptr)
newPtr, err := injectMonitoring(agentInfo, group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
Expand Down Expand Up @@ -154,20 +163,24 @@ GROUPLOOP:
}

func TestMonitoringInjectionDisabled(t *testing.T) {
agentInfo, err := info.NewAgentInfo()
if err != nil {
t.Fatal(err)
}
ast, err := transpiler.NewAST(inputConfigMapDisabled)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(ast)
programsToRun, err := program.Programs(agentInfo, ast)
if err != nil {
t.Fatal(err)
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := injectMonitoring(group, ast, ptr)
newPtr, err := injectMonitoring(agentInfo, group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
Expand Down
Loading

0 comments on commit f5d13aa

Please sign in to comment.