Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement WarningThreshold for TCP/HTTP HealthChecks #4576

Closed
42 changes: 27 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,11 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
existing.Stop()
delete(a.checkHTTPs, check.CheckID)
}
if chkType.WarningThreshold >= chkType.Interval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check HTTP '%s' has WarningThreshold=%v invalid (must be > 0 and < %v), ignoring",
check.CheckID, chkType.WarningThreshold, chkType.Interval))
chkType.WarningThreshold = time.Duration(0 * time.Second)
}
if chkType.Interval < checks.MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
check.CheckID, checks.MinInterval))
Expand All @@ -2277,15 +2282,16 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
tlsClientConfig := a.tlsConfigurator.OutgoingTLSConfigForCheck(chkType.TLSSkipVerify)

http := &checks.CheckHTTP{
Notify: a.State,
CheckID: check.CheckID,
HTTP: chkType.HTTP,
Header: chkType.Header,
Method: chkType.Method,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
TLSClientConfig: tlsClientConfig,
Notify: a.State,
CheckID: check.CheckID,
HTTP: chkType.HTTP,
Header: chkType.Header,
Method: chkType.Method,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
TLSClientConfig: tlsClientConfig,
WarningThreshold: chkType.WarningThreshold,
}
http.Start()
a.checkHTTPs[check.CheckID] = http
Expand All @@ -2295,19 +2301,25 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
existing.Stop()
delete(a.checkTCPs, check.CheckID)
}
if chkType.WarningThreshold >= chkType.Interval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check TCP '%s' has WarningThreshold=%v invalid (must be > 0 and < %v), ignoring",
check.CheckID, chkType.WarningThreshold, chkType.Interval))
chkType.WarningThreshold = time.Duration(0 * time.Second)
}
if chkType.Interval < checks.MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
check.CheckID, checks.MinInterval))
chkType.Interval = checks.MinInterval
}

tcp := &checks.CheckTCP{
Notify: a.State,
CheckID: check.CheckID,
TCP: chkType.TCP,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
Notify: a.State,
CheckID: check.CheckID,
TCP: chkType.TCP,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
WarningThreshold: chkType.WarningThreshold,
}
tcp.Start()
a.checkTCPs[check.CheckID] = tcp
Expand Down
85 changes: 85 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,91 @@ func TestAgent_HTTPCheck_TLSSkipVerify(t *testing.T) {

}

func TestAgent_HTTPCheck_WarningThreshold(t *testing.T) {
t.Parallel()
durationMs := 1 * time.Nanosecond
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(durationMs * 2)
fmt.Fprintln(w, "GOOD")
})
server := httptest.NewServer(handler)
defer server.Close()

a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

health := &structs.HealthCheck{
Node: "foo",
CheckID: "WarningThreshold",
Name: "WarningThreshold check",
Status: api.HealthCritical,
}
interval := 1 * time.Second
chk := &structs.CheckType{
HTTP: server.URL,
Interval: interval,
WarningThreshold: durationMs,
}

err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}

retry.Run(t, func(r *retry.R) {
status := a.State.Checks()["WarningThreshold"]
if status.Status != api.HealthWarning {
r.Fatalf("bad: %v", status.Status)
}
if !strings.Contains(status.Output, "GOOD") {
r.Fatalf("bad: %v", status.Output)
}
})
}

func TestAgent_TCPCheck_WarningThreshold(t *testing.T) {
t.Parallel()
durationMs := 1 * time.Nanosecond
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(durationMs * 2)
fmt.Fprintln(w, "GOOD")
})
server := httptest.NewServer(handler)
defer server.Close()

a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

health := &structs.HealthCheck{
Node: "foo",
CheckID: "WarningThreshold",
Name: "WarningThreshold check",
Status: api.HealthCritical,
}
host := server.URL[7:] // We remove leading http://
interval := 1 * time.Second
chk := &structs.CheckType{
TCP: host,
Interval: interval,
WarningThreshold: durationMs,
}

err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}

retry.Run(t, func(r *retry.R) {
status := a.State.Checks()["WarningThreshold"]
if status.Status != api.HealthWarning {
r.Fatalf("bad: %v", status.Status)
}
if !strings.Contains(status.Output, "slow") {
r.Fatalf("bad: %v", status.Output)
}
})
}

func TestAgent_HTTPCheck_EnableAgentTLSForChecks(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions agent/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

for _, addr := range []string{"0.0.0.0", "::", "[::]"} {
t.Run("addr "+addr, func(t *testing.T) {
Expand All @@ -45,6 +46,7 @@ func TestCatalogDeregister(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

// Register node
args := &structs.DeregisterRequest{Node: "foo"}
Expand All @@ -64,6 +66,7 @@ func TestCatalogDatacenters(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("GET", "/v1/catalog/datacenters", nil)
Expand Down Expand Up @@ -483,6 +486,7 @@ func TestCatalogServices_NodeMetaFilter(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

// Register node
args := &structs.RegisterRequest{
Expand Down
52 changes: 36 additions & 16 deletions agent/checks/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,16 @@ func (c *CheckTTL) SetStatus(status, output string) {
// The check is critical if the response code is anything else
// or if the request returns an error
type CheckHTTP struct {
Notify CheckNotifier
CheckID types.CheckID
HTTP string
Header map[string][]string
Method string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
TLSClientConfig *tls.Config
Notify CheckNotifier
CheckID types.CheckID
HTTP string
Header map[string][]string
Method string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
TLSClientConfig *tls.Config
WarningThreshold time.Duration

httpClient *http.Client
stop bool
Expand Down Expand Up @@ -379,6 +380,7 @@ func (c *CheckHTTP) check() {
method = "GET"
}

start := time.Now()
req, err := http.NewRequest(method, c.HTTP, nil)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q HTTP request failed: %s", c.CheckID, err)
Expand Down Expand Up @@ -423,6 +425,14 @@ func (c *CheckHTTP) check() {

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// PASSING (2xx)
if c.WarningThreshold > 0 {
elapsed := time.Since(start)
if elapsed > c.WarningThreshold {
c.Logger.Printf("[WARN] agent: Check %q is now warning because too slow (%q > %q)", c.CheckID, elapsed, c.WarningThreshold)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, result)
return
}
}
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, result)

Expand All @@ -445,12 +455,13 @@ func (c *CheckHTTP) check() {
// The check is passing if the connection succeeds
// The check is critical if the connection returns an error
type CheckTCP struct {
Notify CheckNotifier
CheckID types.CheckID
TCP string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
Notify CheckNotifier
CheckID types.CheckID
TCP string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
WarningThreshold time.Duration

dialer *net.Dialer
stop bool
Expand Down Expand Up @@ -511,13 +522,22 @@ func (c *CheckTCP) run() {

// check is invoked periodically to perform the TCP check
func (c *CheckTCP) check() {
start := time.Now()
conn, err := c.dialer.Dial(`tcp`, c.TCP)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q socket connection failed: %s", c.CheckID, err)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
conn.Close()
defer conn.Close()
if c.WarningThreshold > 0 {
elapsed := time.Since(start)
if elapsed > c.WarningThreshold {
c.Logger.Printf("[WARN] agent: Check %q was too slow (took %q > %q) state is warning", c.CheckID, elapsed, c.WarningThreshold)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, fmt.Sprintf("TCP connect %s: Success, but too slow: %q", c.TCP, elapsed))
return
}
}
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
}
Expand Down
3 changes: 2 additions & 1 deletion agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func FixupCheckType(raw interface{}) error {
"docker_container_id": "DockerContainerID",
"tls_skip_verify": "TLSSkipVerify",
"service_id": "ServiceID",
"warningthreshold": "WarningThreshold",
})

parseDuration := func(v interface{}) (time.Duration, error) {
Expand Down Expand Up @@ -80,7 +81,7 @@ func FixupCheckType(raw interface{}) error {
}
rawMap[k] = h

case "ttl", "interval", "timeout", "deregistercriticalserviceafter":
case "ttl", "interval", "timeout", "deregistercriticalserviceafter", "warningthreshold":
d, err := parseDuration(v)
if err != nil {
return fmt.Errorf("invalid %q: %v", k, err)
Expand Down
1 change: 1 addition & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,7 @@ func (b *Builder) checkVal(v *CheckDefinition) *structs.CheckDefinition {
AliasService: b.stringVal(v.AliasService),
Timeout: b.durationVal(fmt.Sprintf("check[%s].timeout", id), v.Timeout),
TTL: b.durationVal(fmt.Sprintf("check[%s].ttl", id), v.TTL),
WarningThreshold: b.durationVal(fmt.Sprintf("check[%s].warning_threshold", id), v.WarningThreshold),
DeregisterCriticalServiceAfter: b.durationVal(fmt.Sprintf("check[%s].deregister_critical_service_after", id), v.DeregisterCriticalServiceAfter),
}
}
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ type CheckDefinition struct {
Timeout *string `json:"timeout,omitempty" hcl:"timeout" mapstructure:"timeout"`
TTL *string `json:"ttl,omitempty" hcl:"ttl" mapstructure:"ttl"`
DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"`
WarningThreshold *string `json:"warning_threshold,omitempty" hcl:"warning_threshold" mapstructure:"warning_threshold"`
}

// ServiceConnect is the connect block within a service registration
Expand Down
Loading