Skip to content

Commit

Permalink
Adds discovery_max_stale (#4004)
Browse files Browse the repository at this point in the history
Adds a new option to allow service discovery endpoints to return stale results if configured at the agent level.
  • Loading branch information
Preetha authored Mar 30, 2018
1 parent 2df780f commit a67d27c
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 5 deletions.
29 changes: 29 additions & 0 deletions agent/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,17 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (

var out structs.IndexedNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

s.agent.TranslateAddresses(args.Datacenter, out.Nodes)

// Use empty list instead of nil
Expand All @@ -127,11 +135,18 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request

var out structs.IndexedServices
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Use empty map instead of nil
if out.Services == nil {
Expand Down Expand Up @@ -172,11 +187,18 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedServiceNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes)

// Use empty list instead of nil
Expand Down Expand Up @@ -216,11 +238,18 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedNodeServices
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_node_services"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
if out.NodeServices != nil && out.NodeServices.Node != nil {
s.agent.TranslateAddresses(args.Datacenter, out.NodeServices.Node)
}
Expand Down
1 change: 1 addition & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
EnableDebug: b.boolVal(c.EnableDebug),
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type Config struct {
DisableRemoteExec *bool `json:"disable_remote_exec,omitempty" hcl:"disable_remote_exec" mapstructure:"disable_remote_exec"`
DisableUpdateCheck *bool `json:"disable_update_check,omitempty" hcl:"disable_update_check" mapstructure:"disable_update_check"`
DiscardCheckOutput *bool `json:"discard_check_output" hcl:"discard_check_output" mapstructure:"discard_check_output"`
DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"`
EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"`
EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"`
EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"`
Expand Down
8 changes: 8 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,14 @@ type RuntimeConfig struct {
// flag: -datacenter string
Datacenter string

// Defines the maximum stale value for discovery path. Defauls to "0s".
// Discovery paths are /v1/heath/ paths
//
// If not set to 0, it will try to perform stale read and perform only a
// consistent read whenever the value is too old.
// hcl: discovery_max_stale = "duration"
DiscoveryMaxStale time.Duration

// Node name is the name we use to advertise. Defaults to hostname.
//
// NodeName is exposed via /v1/agent/self from here and
Expand Down
6 changes: 6 additions & 0 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2304,6 +2304,7 @@ func TestFullConfig(t *testing.T) {
"disable_remote_exec": true,
"disable_update_check": true,
"discard_check_output": true,
"discovery_max_stale": "5s",
"domain": "7W1xXSqd",
"dns_config": {
"allow_stale": true,
Expand Down Expand Up @@ -2740,6 +2741,7 @@ func TestFullConfig(t *testing.T) {
disable_remote_exec = true
disable_update_check = true
discard_check_output = true
discovery_max_stale = "5s"
domain = "7W1xXSqd"
dns_config {
allow_stale = true
Expand Down Expand Up @@ -3067,6 +3069,7 @@ func TestFullConfig(t *testing.T) {
"ae_interval": "10003s",
"check_deregister_interval_min": "27870s",
"check_reap_interval": "10662s",
"discovery_max_stale": "5s",
"segment_limit": 24705,
"segment_name_limit": 27046,
"sync_coordinate_interval_min": "27983s",
Expand Down Expand Up @@ -3121,6 +3124,7 @@ func TestFullConfig(t *testing.T) {
ae_interval = "10003s"
check_deregister_interval_min = "27870s"
check_reap_interval = "10662s"
discovery_max_stale = "5s"
segment_limit = 24705
segment_name_limit = 27046
sync_coordinate_interval_min = "27983s"
Expand Down Expand Up @@ -3327,6 +3331,7 @@ func TestFullConfig(t *testing.T) {
DisableRemoteExec: true,
DisableUpdateCheck: true,
DiscardCheckOutput: true,
DiscoveryMaxStale: 5 * time.Second,
EnableACLReplication: true,
EnableAgentTLSForChecks: true,
EnableDebug: true,
Expand Down Expand Up @@ -4008,6 +4013,7 @@ func TestSanitize(t *testing.T) {
"DisableRemoteExec": false,
"DisableUpdateCheck": false,
"DiscardCheckOutput": false,
"DiscoveryMaxStale": "0s",
"EnableACLReplication": false,
"EnableAgentTLSForChecks": false,
"EnableDebug": false,
Expand Down
28 changes: 28 additions & 0 deletions agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Use empty list instead of nil
if out.HealthChecks == nil {
Expand Down Expand Up @@ -66,9 +73,16 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Use empty list instead of nil
if out.HealthChecks == nil {
Expand Down Expand Up @@ -104,9 +118,16 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Use empty list instead of nil
if out.HealthChecks == nil {
Expand Down Expand Up @@ -149,9 +170,16 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
// Make the RPC request
var out structs.IndexedCheckServiceNodes
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()

// Filter to only passing if specified
if _, ok := params[api.HealthPassing]; ok {
Expand Down
40 changes: 38 additions & 2 deletions agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ func setKnownLeader(resp http.ResponseWriter, known bool) {
resp.Header().Set("X-Consul-KnownLeader", s)
}

func setConsistency(resp http.ResponseWriter, consistency string) {
if consistency != "" {
resp.Header().Set("X-Consul-Effective-Consistency", consistency)
}
}

// setLastContact is used to set the last contact header
func setLastContact(resp http.ResponseWriter, last time.Duration) {
if last < 0 {
Expand All @@ -380,6 +386,7 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
setIndex(resp, m.Index)
setLastContact(resp, m.LastContact)
setKnownLeader(resp, m.KnownLeader)
setConsistency(resp, m.ConsistencyLevel)
}

// setHeaders is used to set canonical response header fields
Expand Down Expand Up @@ -416,13 +423,42 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOpti

// parseConsistency is used to parse the ?stale and ?consistent query params.
// Returns true on error
func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
query := req.URL.Query()
defaults := true
if _, ok := query["stale"]; ok {
b.AllowStale = true
defaults = false
}
if _, ok := query["consistent"]; ok {
b.RequireConsistent = true
defaults = false
}
if _, ok := query["leader"]; ok {
defaults = false
}
if maxStale := query.Get("max_stale"); maxStale != "" {
dur, err := time.ParseDuration(maxStale)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
return true
}
b.MaxStaleDuration = dur
if dur.Nanoseconds() > 0 {
b.AllowStale = true
defaults = false
}
}
// No specific Consistency has been specified by caller
if defaults {
path := req.URL.Path
if strings.HasPrefix(path, "/v1/catalog") || strings.HasPrefix(path, "/v1/health") {
if s.agent.config.DiscoveryMaxStale.Nanoseconds() > 0 {
b.MaxStaleDuration = s.agent.config.DiscoveryMaxStale
b.AllowStale = true
}
}
}
if b.AllowStale && b.RequireConsistent {
resp.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -490,7 +526,7 @@ func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string {
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool {
s.parseDC(req, dc)
s.parseToken(req, &b.Token)
if parseConsistency(resp, req, b) {
if s.parseConsistency(resp, req, b) {
return true
}
return parseWait(resp, req, b)
Expand Down
65 changes: 62 additions & 3 deletions agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,9 @@ func TestParseConsistency(t *testing.T) {
var b structs.QueryOptions

req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale", nil)
if d := parseConsistency(resp, req, &b); d {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}

Expand All @@ -620,7 +622,7 @@ func TestParseConsistency(t *testing.T) {

b = structs.QueryOptions{}
req, _ = http.NewRequest("GET", "/v1/catalog/nodes?consistent", nil)
if d := parseConsistency(resp, req, &b); d {
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}

Expand All @@ -632,13 +634,70 @@ func TestParseConsistency(t *testing.T) {
}
}

// ensureConsistency check if consistency modes are correctly applied
// if maxStale < 0 => stale, without MaxStaleDuration
// if maxStale == 0 => no stale
// if maxStale > 0 => stale + check duration
func ensureConsistency(t *testing.T, a *TestAgent, path string, maxStale time.Duration, requireConsistent bool) {
t.Helper()
req, _ := http.NewRequest("GET", path, nil)
var b structs.QueryOptions
resp := httptest.NewRecorder()
if d := a.srv.parseConsistency(resp, req, &b); d {
t.Fatalf("unexpected done")
}
allowStale := maxStale.Nanoseconds() != 0
if b.AllowStale != allowStale {
t.Fatalf("Bad Allow Stale")
}
if maxStale > 0 && b.MaxStaleDuration != maxStale {
t.Fatalf("Bad MaxStaleDuration: %d VS expected %d", b.MaxStaleDuration, maxStale)
}
if b.RequireConsistent != requireConsistent {
t.Fatal("Bad Consistent")
}
}

func TestParseConsistencyAndMaxStale(t *testing.T) {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()

// Default => Consistent
a.config.DiscoveryMaxStale = time.Duration(0)
ensureConsistency(t, a, "/v1/catalog/nodes", 0, false)
// Stale, without MaxStale
ensureConsistency(t, a, "/v1/catalog/nodes?stale", -1, false)
// Override explicitly
ensureConsistency(t, a, "/v1/catalog/nodes?max_stale=3s", 3*time.Second, false)
ensureConsistency(t, a, "/v1/catalog/nodes?stale&max_stale=3s", 3*time.Second, false)

// stale by defaul on discovery
a.config.DiscoveryMaxStale = time.Duration(7 * time.Second)
ensureConsistency(t, a, "/v1/catalog/nodes", a.config.DiscoveryMaxStale, false)
// Not in KV
ensureConsistency(t, a, "/v1/kv/my/path", 0, false)

// DiscoveryConsistencyLevel should apply
ensureConsistency(t, a, "/v1/health/service/one", a.config.DiscoveryMaxStale, false)
ensureConsistency(t, a, "/v1/catalog/service/one", a.config.DiscoveryMaxStale, false)
ensureConsistency(t, a, "/v1/catalog/services", a.config.DiscoveryMaxStale, false)

// Query path should be taken into account
ensureConsistency(t, a, "/v1/catalog/services?consistent", 0, true)
// Since stale is added, no MaxStale should be applied
ensureConsistency(t, a, "/v1/catalog/services?stale", -1, false)
ensureConsistency(t, a, "/v1/catalog/services?leader", 0, false)
}

func TestParseConsistency_Invalid(t *testing.T) {
t.Parallel()
resp := httptest.NewRecorder()
var b structs.QueryOptions

req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale&consistent", nil)
if d := parseConsistency(resp, req, &b); !d {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
if d := a.srv.parseConsistency(resp, req, &b); !d {
t.Fatalf("expected done")
}

Expand Down
Loading

0 comments on commit a67d27c

Please sign in to comment.