Skip to content

Commit

Permalink
Only log publish event messages in trace log level under elastic-agent (
Browse files Browse the repository at this point in the history
#34391)

* Only log publish event messages in trace log level under elastic-agent.

* Add changelog entry.

* Fix changelog.

* Add same logic to processors.

(cherry picked from commit 15ff7d1)

# Conflicts:
#	libbeat/processors/actions/append.go
  • Loading branch information
blakerouse authored and mergify[bot] committed Jan 31, 2023
1 parent 66abded commit f910d68
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Log errors from the Elastic Agent V2 client errors channel. Avoids blocking when error occurs communicating with the Elastic Agent. {pull}34392[34392]
- Only log publish event messages in trace log level under elastic-agent. {pull}34391[34391]

*Auditbeat*

Expand Down
182 changes: 182 additions & 0 deletions libbeat/processors/actions/append.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

type appendProcessor struct {
config appendProcessorConfig
logger *logp.Logger
}

type appendProcessorConfig struct {
Fields []string `config:"fields"`
TargetField string `config:"target_field"`
Values []interface{} `config:"values"`
IgnoreMissing bool `config:"ignore_missing"`
IgnoreEmptyValues bool `config:"ignore_empty_values"`
FailOnError bool `config:"fail_on_error"`
AllowDuplicate bool `config:"allow_duplicate"`
}

func init() {
processors.RegisterPlugin("append",
checks.ConfigChecked(NewAppendProcessor,
checks.RequireFields("target_field"),
),
)
jsprocessor.RegisterPlugin("AppendProcessor", NewAppendProcessor)
}

// NewAppendProcessor returns a new append processor.
func NewAppendProcessor(c *conf.C) (processors.Processor, error) {
config := appendProcessorConfig{
IgnoreMissing: false,
IgnoreEmptyValues: false,
FailOnError: true,
AllowDuplicate: true,
}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("failed to unpack the configuration of append processor: %w", err)
}

f := &appendProcessor{
config: config,
logger: logp.NewLogger("append"),
}
return f, nil
}

func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) {
var backup *beat.Event
if f.config.FailOnError {
backup = event.Clone()
}

err := f.appendValues(f.config.TargetField, f.config.Fields, f.config.Values, event)
if err != nil {
errMsg := fmt.Errorf("failed to append fields in append processor: %w", err)
if publisher.LogWithTrace() {
f.logger.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
if _, err := event.PutValue("error.message", errMsg.Error()); err != nil {
return nil, fmt.Errorf("failed to append fields in append processor: %w", err)
}
return event, err
}
}

return event, nil
}

func (f *appendProcessor) appendValues(target string, fields []string, values []interface{}, event *beat.Event) error {
var arr []interface{}

// get the existing value of target field
targetVal, err := event.GetValue(target)
if err != nil {
f.logger.Debugf("could not fetch value for key: '%s'. Therefore, all the values will be appended in a new key %s.", target, target)
} else {
targetArr, ok := targetVal.([]interface{})
if ok {
arr = append(arr, targetArr...)
} else {
arr = append(arr, targetVal)
}
}

// append the values of all the fields listed under 'fields' section
for _, field := range fields {
val, err := event.GetValue(field)
if err != nil {
if f.config.IgnoreMissing && err.Error() == "key not found" {
continue
}
return fmt.Errorf("could not fetch value for key: %s, Error: %w", field, err)
}
valArr, ok := val.([]interface{})
if ok {
arr = append(arr, valArr...)
} else {
arr = append(arr, val)
}
}

// append all the static values from 'values' section
arr = append(arr, values...)

// remove empty strings and nil from the array
if f.config.IgnoreEmptyValues {
arr = cleanEmptyValues(arr)
}

// remove duplicate values from the array
if !f.config.AllowDuplicate {
arr = removeDuplicates(arr)
}

// replace the existing target with new array
if err := event.Delete(target); err != nil && !(err.Error() == "key not found") {
return fmt.Errorf("unable to delete the target field %s due to error: %w", target, err)
}
if _, err := event.PutValue(target, arr); err != nil {
return fmt.Errorf("unable to put values in the target field %s due to error: %w", target, err)
}

return nil
}

func (f *appendProcessor) String() string {
return "append=" + fmt.Sprintf("%+v", f.config.TargetField)
}

// this function will remove all the empty strings and nil values from the array
func cleanEmptyValues(dirtyArr []interface{}) (cleanArr []interface{}) {
for _, val := range dirtyArr {
if val == "" || val == nil {
continue
}
cleanArr = append(cleanArr, val)
}
return cleanArr
}

// this function will remove all the duplicate values from the array
func removeDuplicates(dirtyArr []interface{}) (cleanArr []interface{}) {
set := make(map[interface{}]bool, 0)
for _, val := range dirtyArr {
if _, ok := set[val]; !ok {
set[val] = true
cleanArr = append(cleanArr, val)
}
}
return cleanArr
}
5 changes: 4 additions & 1 deletion libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -79,7 +80,9 @@ func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
err := f.copyField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err)
f.logger.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.logger.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/decode_base64_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -84,7 +85,9 @@ func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) {
err := f.decodeField(event)
if err != nil {
errMsg := fmt.Errorf("failed to decode base64 fields in processor: %v", err)
f.log.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.log.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/decompress_gzip_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -76,7 +77,9 @@ func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) {
err := f.decompressGzipField(event)
if err != nil {
errMsg := fmt.Errorf("Failed to decompress field in decompress_gzip_field processor: %v", err)
f.log.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.log.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -84,7 +85,9 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) {
err := f.renameField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err)
f.logger.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.logger.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
7 changes: 6 additions & 1 deletion libbeat/processors/actions/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type replaceString struct {
config replaceStringConfig
log *logp.Logger
}

type replaceStringConfig struct {
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewReplaceString(c *conf.C) (processors.Processor, error) {

f := &replaceString{
config: config,
log: logp.NewLogger("replace"),
}
return f, nil
}
Expand All @@ -84,7 +87,9 @@ func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) {
err := f.replaceField(field.Field, field.Pattern, field.Replacement, event)
if err != nil {
errMsg := fmt.Errorf("Failed to replace fields in processor: %s", err)
logp.Debug("replace", errMsg.Error())
if publisher.LogWithTrace() {
f.log.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
3 changes: 3 additions & 0 deletions libbeat/processors/actions/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"regexp"
"testing"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -140,6 +142,7 @@ func TestReplaceRun(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
f := &replaceString{
log: logp.NewLogger("replace"),
config: replaceStringConfig{
Fields: test.Fields,
IgnoreMissing: test.IgnoreMissing,
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/urldecode/urldecode.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -83,7 +84,9 @@ func (p *urlDecode) Run(event *beat.Event) (*beat.Event, error) {
err := p.decodeField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("failed to decode fields in urldecode processor: %v", err)
p.log.Debug(errMsg.Error())
if publisher.LogWithTrace() {
p.log.Debug(errMsg.Error())
}
if p.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
60 changes: 60 additions & 0 deletions libbeat/publisher/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package publisher

import (
"github.com/elastic/beats/v7/libbeat/common/atomic"
)

var (
// underAgent is set to true with this beat is being ran under the elastic-agent
underAgent = atomic.MakeBool(false)

// underAgentTrace is set to true when the elastic-agent has placed this beat into
// trace mode (which enables logging of published events)
underAgentTrace = atomic.MakeBool(false)
)

// SetUnderAgent sets that the processing pipeline is being ran under the elastic-agent.
func SetUnderAgent(val bool) {
underAgent.Store(val)
}

// SetUnderAgentTrace sets that trace mode has been enabled by the elastic-agent.
//
// SetUnderAgent must also be called and set to true before this has an effect.
func SetUnderAgentTrace(val bool) {
underAgentTrace.Store(val)
}

// UnderAgent returns true when running under Elastic Agent.
func UnderAgent() bool {
return underAgent.Load()
}

// LogWithTrace returns true when not running under Elastic Agent always or
// only true if running under Elastic Agent with trace logging enabled.
func LogWithTrace() bool {
agent := underAgent.Load()
if agent {
trace := underAgentTrace.Load()
return trace
}
// Always true when not running under the Elastic Agent.
return true
}
Loading

0 comments on commit f910d68

Please sign in to comment.