Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix nagios parser to parse all perfdata and report info message. #5601

Merged
merged 6 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 46 additions & 66 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package exec
import (
"bytes"
"fmt"
"log"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/kballard/go-shellquote"
Expand Down Expand Up @@ -61,39 +61,18 @@ func NewExec() *Exec {
}

type Runner interface {
Run(*Exec, string, telegraf.Accumulator) ([]byte, error)
Run(string, time.Duration) ([]byte, []byte, error)
}

type CommandRunner struct{}

func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
nagiosState := 0
if exitCode != nil {
exiterr, ok := exitCode.(*exec.ExitError)
if ok {
status, ok := exiterr.Sys().(syscall.WaitStatus)
if ok {
nagiosState = status.ExitStatus()
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
}
fields := map[string]interface{}{"state": nagiosState}
acc.AddFields("nagios_state", fields, nil)
return nil
}

func (c CommandRunner) Run(
e *Exec,
command string,
acc telegraf.Accumulator,
) ([]byte, error) {
timeout time.Duration,
) ([]byte, []byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
return nil, nil, fmt.Errorf("exec: unable to parse command, %s", err)
}

cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
Expand All @@ -105,44 +84,35 @@ func (c CommandRunner) Run(
cmd.Stdout = &out
cmd.Stderr = &stderr

if err := internal.RunTimeout(cmd, e.Timeout.Duration); err != nil {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
default:
var errMessage = ""
if stderr.Len() > 0 {
stderr = removeCarriageReturns(stderr)
// Limit the number of bytes.
didTruncate := false
if stderr.Len() > MaxStderrBytes {
stderr.Truncate(MaxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(stderr.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < stderr.Len()-1 {
didTruncate = true
}
stderr.Truncate(i)
}
if didTruncate {
stderr.WriteString("...")
}
runErr := internal.RunTimeout(cmd, timeout)

errMessage = fmt.Sprintf(": %s", stderr.String())
}
return nil, fmt.Errorf("exec: %s for command '%s'%s", err, command, errMessage)
}
} else {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(nil, acc)
}
out = removeCarriageReturns(out)
if stderr.Len() > 0 {
stderr = removeCarriageReturns(stderr)
stderr = truncate(stderr)
}

out = removeCarriageReturns(out)
return out.Bytes(), nil
return out.Bytes(), stderr.Bytes(), runErr
}

func truncate(buf bytes.Buffer) bytes.Buffer {
// Limit the number of bytes.
didTruncate := false
if buf.Len() > MaxStderrBytes {
buf.Truncate(MaxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(buf.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < buf.Len()-1 {
didTruncate = true
}
buf.Truncate(i)
}
if didTruncate {
buf.WriteString("...")
}
return buf
}

// removeCarriageReturns removes all carriage returns from the input if the
Expand Down Expand Up @@ -173,21 +143,31 @@ func removeCarriageReturns(b bytes.Buffer) bytes.Buffer {

func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer wg.Done()
_, isNagios := e.parser.(*nagios.NagiosParser)

out, err := e.runner.Run(e, command, acc)
if err != nil {
out, errbuf, runErr := e.runner.Run(command, e.Timeout.Duration)
if !isNagios && runErr != nil {
err := fmt.Errorf("exec: %s for command '%s': %s", runErr, command, string(errbuf))
acc.AddError(err)
return
}

metrics, err := e.parser.Parse(out)
if err != nil {
acc.AddError(err)
} else {
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
return
}

if isNagios {
metrics, err = nagios.TryAddState(runErr, metrics)
if err != nil {
log.Printf("E! [inputs.exec] failed to add nagios state: %s", err)
}
}

for _, m := range metrics {
acc.AddMetric(m)
}
}

func (e *Exec) SampleConfig() string {
Expand Down
87 changes: 73 additions & 14 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"runtime"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"

"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -74,22 +74,21 @@ var crTests = []CarriageReturnTest{
}

type runnerMock struct {
out []byte
err error
out []byte
errout []byte
err error
}

func newRunnerMock(out []byte, err error) Runner {
func newRunnerMock(out []byte, errout []byte, err error) Runner {
return &runnerMock{
out: out,
err: err,
out: out,
errout: errout,
err: err,
}
}

func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
return r.out, nil
func (r runnerMock) Run(command string, _ time.Duration) ([]byte, []byte, error) {
return r.out, r.errout, r.err
}

func TestExec(t *testing.T) {
Expand All @@ -98,7 +97,7 @@ func TestExec(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
runner: newRunnerMock([]byte(validJson), nil, nil),
Commands: []string{"testcommand arg1"},
parser: parser,
}
Expand Down Expand Up @@ -127,7 +126,7 @@ func TestExecMalformed(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
runner: newRunnerMock([]byte(malformedJson), nil, nil),
Commands: []string{"badcommand arg1"},
parser: parser,
}
Expand All @@ -143,7 +142,7 @@ func TestCommandError(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
runner: newRunnerMock(nil, nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
parser: parser,
}
Expand Down Expand Up @@ -201,6 +200,66 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) {
acc.AssertContainsFields(t, "metric", fields)
}

func TestTruncate(t *testing.T) {
tests := []struct {
name string
bufF func() *bytes.Buffer
expF func() *bytes.Buffer
}{
{
name: "should not truncate",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
return &b
},
},
{
name: "should truncate up to the new line",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world\nand all the people")
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world...")
return &b
},
},
{
name: "should truncate to the MaxStderrBytes",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
for i := 0; i < 2*MaxStderrBytes; i++ {
b.WriteByte('b')
}
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
for i := 0; i < MaxStderrBytes; i++ {
b.WriteByte('b')
}
b.WriteString("...")
return &b
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res := truncate(*tt.bufF())
require.Equal(t, tt.expF().Bytes(), res.Bytes())
})
}
}

func TestRemoveCarriageReturns(t *testing.T) {
if runtime.GOOS == "windows" {
// Test that all carriage returns are removed
Expand Down
Loading