Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for maximum size for Output of checks #5233

Merged
merged 14 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.CoordinateUpdateBatchSize = a.config.ConsulCoordinateUpdateBatchSize
base.CoordinateUpdateMaxBatches = a.config.ConsulCoordinateUpdateMaxBatches
base.CoordinateUpdatePeriod = a.config.ConsulCoordinateUpdatePeriod
base.CheckOutputMaxSize = a.config.CheckOutputMaxSize

base.RaftConfig.HeartbeatTimeout = a.config.ConsulRaftHeartbeatTimeout
base.RaftConfig.LeaderLeaseTimeout = a.config.ConsulRaftLeaderLeaseTimeout
Expand Down Expand Up @@ -971,6 +972,9 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
if a.config.Bootstrap {
base.Bootstrap = true
}
if a.config.CheckOutputMaxSize > 0 {
base.CheckOutputMaxSize = a.config.CheckOutputMaxSize
}
if a.config.RejoinAfterLeave {
base.RejoinAfterLeave = true
}
Expand Down Expand Up @@ -2239,6 +2243,13 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,

// Check if already registered
if chkType != nil {
maxOutputSize := a.config.CheckOutputMaxSize
if maxOutputSize == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be unnecessary. How could a.config.CheckOutputMaxSize end up as 0? The config builder has a default value of 4096 and returns an error if the input is < 1.

maxOutputSize = checks.DefaultBufSize
}
if chkType.OutputMaxSize > 0 && maxOutputSize > chkType.OutputMaxSize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mention in your description, this will only override the agent check if the check's OutputMaxSize is less than the agent's.

Could you expand more on why we shouldn't allow the check's max to override the agent's max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@freddygv I wanted to be as conservative as possible (meaning, that for instance, if someone is storing those results in a DB and assumed size will never be more than 4k, we won't break this assumption), but allowing large infrastructure to reduce the size of their checks since it might be hugely impacting when checks are varying a lot (for instance, the normal result if checks if 'OK', while the broken checks returns large stack traces), ask @orarnon for instance who had this issue (we did had it as well on very large clusters in the past)

Increasing this value is trivial and could be done in another patch if someone is resquesting it, but if you really want, we can remove this limit in this PR as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pierresouchay Thanks. Can you please expand on the need for the check-specific max output size? Would adding a max for the agent alone be sufficient?

The implementation would be simpler if we only set this max once. Additionally, there is a UX mismatch where someone can discard output at the agent level, but at the check level the max size can only be brought down to 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the reason for that is serialization of payload: if the user specify 0, it means no value has been specified in payload (for instance for a user not setting OutputMaxSize in the check definition and thus preserving backward compatibility.
As you pointed out in a previous comment, if the user completely want to disable checks output, it can be done by disabling completely check output.

maxOutputSize = chkType.OutputMaxSize
}
switch {

case chkType.IsTTL():
Expand Down Expand Up @@ -2285,6 +2296,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
TLSClientConfig: tlsClientConfig,
}
http.Start()
Expand Down Expand Up @@ -2352,7 +2364,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

if a.dockerClient == nil {
dc, err := checks.NewDockerClient(os.Getenv("DOCKER_HOST"), checks.BufSize)
dc, err := checks.NewDockerClient(os.Getenv("DOCKER_HOST"), int64(maxOutputSize))
if err != nil {
a.logger.Printf("[ERR] agent: error creating docker client: %s", err)
return err
Expand Down Expand Up @@ -2387,14 +2399,14 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
check.CheckID, checks.MinInterval)
chkType.Interval = checks.MinInterval
}

monitor := &checks.CheckMonitor{
Notify: a.State,
CheckID: check.CheckID,
ScriptArgs: chkType.ScriptArgs,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
Notify: a.State,
CheckID: check.CheckID,
ScriptArgs: chkType.ScriptArgs,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
Expand Down
4 changes: 2 additions & 2 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,9 +716,9 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques
}

total := len(update.Output)
if total > checks.BufSize {
if total > checks.DefaultBufSize {
pierresouchay marked this conversation as resolved.
Show resolved Hide resolved
update.Output = fmt.Sprintf("%s ... (captured %d of %d bytes)",
update.Output[:checks.BufSize], checks.BufSize, total)
update.Output[:checks.DefaultBufSize], checks.DefaultBufSize, total)
}

checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
Expand Down
4 changes: 2 additions & 2 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2340,7 +2340,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
t.Run("log output limit", func(t *testing.T) {
args := checkUpdate{
Status: api.HealthPassing,
Output: strings.Repeat("-= bad -=", 5*checks.BufSize),
Output: strings.Repeat("-= bad -=", 5*checks.DefaultBufSize),
}
req, _ := http.NewRequest("PUT", "/v1/agent/check/update/test", jsonReader(args))
resp := httptest.NewRecorder()
Expand All @@ -2359,7 +2359,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
// rough check that the output buffer was cut down so this test
// isn't super brittle.
state := a.State.Checks()["test"]
if state.Status != api.HealthPassing || len(state.Output) > 2*checks.BufSize {
if state.Status != api.HealthPassing || len(state.Output) > 2*checks.DefaultBufSize {
t.Fatalf("bad: %v", state)
}
})
Expand Down
29 changes: 17 additions & 12 deletions agent/checks/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const (
// Otherwise we risk fork bombing a system.
MinInterval = time.Second

// BufSize is the maximum size of the captured
// check output. Prevents an enormous buffer
// DefaultBufSize is the maximum size of the captured
// check output by defaut. Prevents an enormous buffer
// from being captured
BufSize = 4 * 1024 // 4KB
DefaultBufSize = 4 * 1024 // 4KB

// UserAgent is the value of the User-Agent header
// for HTTP health checks.
Expand All @@ -56,13 +56,14 @@ type CheckNotifier interface {
// determine the health of a given check. It is compatible with
// nagios plugins and expects the output in the same format.
type CheckMonitor struct {
Notify CheckNotifier
CheckID types.CheckID
Script string
ScriptArgs []string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
Notify CheckNotifier
CheckID types.CheckID
Script string
ScriptArgs []string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
OutputMaxSize int

stop bool
stopCh chan struct{}
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *CheckMonitor) check() {
}

// Collect the output
output, _ := circbuf.NewBuffer(BufSize)
output, _ := circbuf.NewBuffer(int64(c.OutputMaxSize))
cmd.Stdout = output
cmd.Stderr = output
exec.SetSysProcAttr(cmd)
Expand Down Expand Up @@ -303,6 +304,7 @@ type CheckHTTP struct {
Timeout time.Duration
Logger *log.Logger
TLSClientConfig *tls.Config
OutputMaxSize int

httpClient *http.Client
stop bool
Expand Down Expand Up @@ -339,6 +341,9 @@ func (c *CheckHTTP) Start() {
} else if c.Interval < 10*time.Second {
c.httpClient.Timeout = c.Interval
}
if c.OutputMaxSize < 1 {
c.OutputMaxSize = DefaultBufSize
}
}

c.stop = false
Expand Down Expand Up @@ -413,7 +418,7 @@ func (c *CheckHTTP) check() {
defer resp.Body.Close()

// Read the response into a circular buffer to limit the size
output, _ := circbuf.NewBuffer(BufSize)
output, _ := circbuf.NewBuffer(int64(c.OutputMaxSize))
if _, err := io.Copy(output, resp.Body); err != nil {
c.Logger.Printf("[WARN] agent: Check %q error while reading body: %s", c.CheckID, err)
}
Expand Down
121 changes: 82 additions & 39 deletions agent/checks/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ func TestCheckMonitor_Script(t *testing.T) {
t.Run(tt.status, func(t *testing.T) {
notif := mock.NewNotify()
check := &CheckMonitor{
Notify: notif,
CheckID: types.CheckID("foo"),
Script: tt.script,
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
Notify: notif,
CheckID: types.CheckID("foo"),
Script: tt.script,
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
OutputMaxSize: DefaultBufSize,
}
check.Start()
defer check.Stop()
Expand Down Expand Up @@ -79,11 +80,12 @@ func TestCheckMonitor_Args(t *testing.T) {
t.Run(tt.status, func(t *testing.T) {
notif := mock.NewNotify()
check := &CheckMonitor{
Notify: notif,
CheckID: types.CheckID("foo"),
ScriptArgs: tt.args,
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
Notify: notif,
CheckID: types.CheckID("foo"),
ScriptArgs: tt.args,
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
OutputMaxSize: DefaultBufSize,
}
check.Start()
defer check.Stop()
Expand All @@ -103,12 +105,13 @@ func TestCheckMonitor_Timeout(t *testing.T) {
// t.Parallel() // timing test. no parallel
notif := mock.NewNotify()
check := &CheckMonitor{
Notify: notif,
CheckID: types.CheckID("foo"),
ScriptArgs: []string{"sh", "-c", "sleep 1 && exit 0"},
Interval: 50 * time.Millisecond,
Timeout: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
Notify: notif,
CheckID: types.CheckID("foo"),
ScriptArgs: []string{"sh", "-c", "sleep 1 && exit 0"},
Interval: 50 * time.Millisecond,
Timeout: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
OutputMaxSize: DefaultBufSize,
}
check.Start()
defer check.Stop()
Expand All @@ -128,11 +131,12 @@ func TestCheckMonitor_RandomStagger(t *testing.T) {
// t.Parallel() // timing test. no parallel
notif := mock.NewNotify()
check := &CheckMonitor{
Notify: notif,
CheckID: types.CheckID("foo"),
ScriptArgs: []string{"sh", "-c", "exit 0"},
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
Notify: notif,
CheckID: types.CheckID("foo"),
ScriptArgs: []string{"sh", "-c", "exit 0"},
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
OutputMaxSize: DefaultBufSize,
}
check.Start()
defer check.Stop()
Expand All @@ -153,19 +157,20 @@ func TestCheckMonitor_LimitOutput(t *testing.T) {
t.Parallel()
notif := mock.NewNotify()
check := &CheckMonitor{
Notify: notif,
CheckID: types.CheckID("foo"),
ScriptArgs: []string{"od", "-N", "81920", "/dev/urandom"},
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
Notify: notif,
CheckID: types.CheckID("foo"),
ScriptArgs: []string{"od", "-N", "81920", "/dev/urandom"},
Interval: 25 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
OutputMaxSize: DefaultBufSize,
}
check.Start()
defer check.Stop()

time.Sleep(50 * time.Millisecond)

// Allow for extra bytes for the truncation message
if len(notif.Output("foo")) > BufSize+100 {
if len(notif.Output("foo")) > DefaultBufSize+100 {
t.Fatalf("output size is too long")
}
}
Expand Down Expand Up @@ -287,21 +292,22 @@ func TestCheckHTTP(t *testing.T) {
}

// Body larger than 4k limit
body := bytes.Repeat([]byte{'a'}, 2*BufSize)
body := bytes.Repeat([]byte{'a'}, 2*DefaultBufSize)
w.WriteHeader(tt.code)
w.Write(body)
}))
defer server.Close()

notif := mock.NewNotify()
check := &CheckHTTP{
Notify: notif,
CheckID: types.CheckID("foo"),
HTTP: server.URL,
Method: tt.method,
Header: tt.header,
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
Notify: notif,
CheckID: types.CheckID("foo"),
HTTP: server.URL,
Method: tt.method,
OutputMaxSize: DefaultBufSize,
Header: tt.header,
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
}
check.Start()
defer check.Stop()
Expand All @@ -313,15 +319,52 @@ func TestCheckHTTP(t *testing.T) {
if got, want := notif.State("foo"), tt.status; got != want {
r.Fatalf("got state %q want %q", got, want)
}
// Allow slightly more data than BufSize, for the header
if n := len(notif.Output("foo")); n > (BufSize + 256) {
r.Fatalf("output too long: %d (%d-byte limit)", n, BufSize)
// Allow slightly more data than DefaultBufSize, for the header
if n := len(notif.Output("foo")); n > (DefaultBufSize + 256) {
r.Fatalf("output too long: %d (%d-byte limit)", n, DefaultBufSize)
}
})
})
}
}

func TestCheckMaxOutputSize(t *testing.T) {
t.Parallel()
timeout := 5 * time.Millisecond
server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, req *http.Request) {
body := bytes.Repeat([]byte{'x'}, 2*DefaultBufSize)
writer.WriteHeader(200)
writer.Write(body)
}))
defer server.Close()

notif := mock.NewNotify()
maxOutputSize := 32
check := &CheckHTTP{
Notify: notif,
CheckID: types.CheckID("bar"),
HTTP: server.URL + "/v1/agent/self",
Timeout: timeout,
Interval: 2 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
OutputMaxSize: maxOutputSize,
}

check.Start()
defer check.Stop()
retry.Run(t, func(r *retry.R) {
if got, want := notif.Updates("bar"), 2; got < want {
r.Fatalf("got %d updates want at least %d", got, want)
}
if got, want := notif.State("bar"), api.HealthPassing; got != want {
r.Fatalf("got state %q want %q", got, want)
}
if got, want := notif.Output("bar"), "HTTP GET "+server.URL+"/v1/agent/self: 200 OK Output: "+strings.Repeat("x", maxOutputSize); got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}

func TestCheckHTTPTimeout(t *testing.T) {
t.Parallel()
timeout := 5 * time.Millisecond
Expand Down Expand Up @@ -372,7 +415,7 @@ func TestCheckHTTP_disablesKeepAlives(t *testing.T) {
func largeBodyHandler(code int) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Body larger than 4k limit
body := bytes.Repeat([]byte{'a'}, 2*BufSize)
body := bytes.Repeat([]byte{'a'}, 2*DefaultBufSize)
w.WriteHeader(code)
w.Write(body)
})
Expand Down
Loading