From 674be58e55f3f2b1f1c64ef2f52bfbd577db0c7c Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 13 Jan 2015 17:52:17 -0800 Subject: [PATCH 1/7] agent: support multiple checks per service --- command/agent/agent.go | 40 ++- command/agent/agent_endpoint.go | 14 +- command/agent/agent_endpoint_test.go | 57 ++++ command/agent/agent_test.go | 263 +++++++++++++----- command/agent/check.go | 1 + command/agent/config.go | 42 +-- command/agent/config_test.go | 38 ++- command/agent/local.go | 132 ++++++--- command/agent/structs.go | 24 +- consul/catalog_endpoint.go | 13 +- consul/state_store.go | 7 +- consul/state_store_test.go | 11 +- consul/structs/structs.go | 1 + .../source/docs/agent/checks.html.markdown | 22 ++ website/source/docs/agent/http.html.markdown | 15 + .../source/docs/agent/services.html.markdown | 39 ++- 16 files changed, 537 insertions(+), 182 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 9454dba4c0ea..e3be1b6cfcdd 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "github.com/hashicorp/consul/consul" @@ -582,15 +583,17 @@ func (a *Agent) purgeCheck(checkID string) error { // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType, persist bool) error { +func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error { if service.Service == "" { return fmt.Errorf("Service name missing") } if service.ID == "" && service.Service != "" { service.ID = service.Service } - if chkType != nil && !chkType.Valid() { - return fmt.Errorf("Check type is not valid") + for _, check := range chkTypes { + if !check.Valid() { + return fmt.Errorf("Check type is not valid") + } } // Add the service @@ -604,10 +607,14 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType, per } // Create an associated health check - if chkType != nil { + for i, chkType := range chkTypes { + checkID := fmt.Sprintf("service:%s", service.ID) + if len(chkTypes) > 1 { + checkID += fmt.Sprintf(":%d", i+1) + } check := &structs.HealthCheck{ Node: a.config.NodeName, - CheckID: fmt.Sprintf("service:%s", service.ID), + CheckID: checkID, Name: fmt.Sprintf("Service '%s' check", service.Service), Status: structs.HealthCritical, Notes: chkType.Notes, @@ -642,9 +649,14 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { } // Deregister any associated health checks - checkID := fmt.Sprintf("service:%s", serviceID) - if err := a.RemoveCheck(checkID, persist); err != nil { - return err + for checkID, _ := range a.state.Checks() { + prefix := "service:" + serviceID + if checkID != prefix && !strings.HasPrefix(checkID, prefix+":") { + continue + } + if err := a.RemoveCheck(checkID, persist); err != nil { + return err + } } log.Printf("[DEBUG] agent: removed service %q", serviceID) @@ -663,6 +675,14 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist return fmt.Errorf("Check type is not valid") } + if check.ServiceID != "" { + svc, ok := a.state.Services()[check.ServiceID] + if !ok { + return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) + } + check.ServiceName = svc.Service + } + a.checkLock.Lock() defer a.checkLock.Unlock() @@ -864,8 +884,8 @@ func (a *Agent) loadServices(conf *Config) error { // Register the services from config for _, service := range conf.Services { ns := service.NodeService() - chkType := service.CheckType() - if err := a.AddService(ns, chkType, false); err != nil { + chkTypes := service.CheckTypes() + if err := a.AddService(ns, chkTypes, false); err != nil { return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) } } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 63fa0d21f1c4..c78c2baa34be 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -161,15 +161,17 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re ns := args.NodeService() // Verify the check type - chkType := args.CheckType() - if chkType != nil && !chkType.Valid() { - resp.WriteHeader(400) - resp.Write([]byte("Must provide TTL or Script and Interval!")) - return nil, nil + chkTypes := args.CheckTypes() + for _, check := range chkTypes { + if !check.Valid() { + resp.WriteHeader(400) + resp.Write([]byte("Must provide TTL or Script and Interval!")) + return nil, nil + } } // Add the check - return nil, s.agent.AddService(ns, chkType, true) + return nil, s.agent.AddService(ns, chkTypes, true) } func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index eba6c28e79cd..f49ab171471c 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -683,3 +683,60 @@ func TestHTTPAgent_DisableNodeMaintenance(t *testing.T) { t.Fatalf("should have removed maintenance check") } } + +func TestHTTPAgentRegisterServiceCheck(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // First register the service + req, err := http.NewRequest("GET", "/v1/agent/service/register", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + args := &ServiceDefinition{ + Name: "memcache", + Port: 8000, + Check: CheckType{ + TTL: 15 * time.Second, + }, + } + req.Body = encodeReq(args) + + if _, err := srv.AgentRegisterService(nil, req); err != nil { + t.Fatalf("err: %v", err) + } + + // Now register an additional check + req, err = http.NewRequest("GET", "/v1/agent/check/register", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + checkArgs := &CheckDefinition{ + Name: "memcache_check2", + ServiceID: "memcache", + CheckType: CheckType{ + TTL: 15 * time.Second, + }, + } + req.Body = encodeReq(checkArgs) + + if _, err := srv.AgentRegisterCheck(nil, req); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a check mapping + result := srv.agent.state.Checks() + if _, ok := result["service:memcache"]; !ok { + t.Fatalf("missing memcached check") + } + if _, ok := result["memcache_check2"]; !ok { + t.Fatalf("missing memcache_check2 check") + } + + // Make sure the new check is associated with the service + if result["memcache_check2"].ServiceID != "memcache" { + t.Fatalf("bad: %#v", result["memcached_check2"]) + } +} diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 37e2ebc32318..5279b25c47bf 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -139,39 +139,96 @@ func TestAgent_AddService(t *testing.T) { defer os.RemoveAll(dir) defer agent.Shutdown() - srv := &structs.NodeService{ - ID: "redis", - Service: "redis", - Tags: []string{"foo"}, - Port: 8000, - } - chk := &CheckType{ - TTL: time.Minute, - Notes: "redis health check", - } - err := agent.AddService(srv, chk, false) - if err != nil { - t.Fatalf("err: %v", err) - } + // Service registration with a single check + { + srv := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + chkTypes := CheckTypes{ + &CheckType{ + TTL: time.Minute, + Notes: "redis heath check 2", + }, + } + err := agent.AddService(srv, chkTypes, false) + if err != nil { + t.Fatalf("err: %v", err) + } - // Ensure we have a state mapping - if _, ok := agent.state.Services()["redis"]; !ok { - t.Fatalf("missing redis service") - } + // Ensure we have a state mapping + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("missing redis service") + } - // Ensure we have a check mapping - if _, ok := agent.state.Checks()["service:redis"]; !ok { - t.Fatalf("missing redis check") - } + // Ensure the check is registered + if _, ok := agent.state.Checks()["service:redis"]; !ok { + t.Fatalf("missing redis check") + } - // Ensure a TTL is setup - if _, ok := agent.checkTTLs["service:redis"]; !ok { - t.Fatalf("missing redis check ttl") + // Ensure a TTL is setup + if _, ok := agent.checkTTLs["service:redis"]; !ok { + t.Fatalf("missing redis check ttl") + } + + // Ensure the notes are passed through + if agent.state.Checks()["service:redis"].Notes == "" { + t.Fatalf("missing redis check notes") + } } - // Ensure the notes are passed through - if agent.state.Checks()["service:redis"].Notes == "" { - t.Fatalf("missing redis check notes") + // Service registration with multiple checks + { + srv := &structs.NodeService{ + ID: "memcache", + Service: "memcache", + Tags: []string{"bar"}, + Port: 8000, + } + chkTypes := CheckTypes{ + &CheckType{ + TTL: time.Minute, + Notes: "memcache health check 1", + }, + &CheckType{ + TTL: time.Second, + Notes: "memcache heath check 2", + }, + } + if err := agent.AddService(srv, chkTypes, false); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a state mapping + if _, ok := agent.state.Services()["memcache"]; !ok { + t.Fatalf("missing memcache service") + } + + // Ensure both checks were added + if _, ok := agent.state.Checks()["service:memcache:1"]; !ok { + t.Fatalf("missing memcache:1 check") + } + if _, ok := agent.state.Checks()["service:memcache:2"]; !ok { + t.Fatalf("missing memcache:2 check") + } + + // Ensure a TTL is setup + if _, ok := agent.checkTTLs["service:memcache:1"]; !ok { + t.Fatalf("missing memcache:1 check ttl") + } + if _, ok := agent.checkTTLs["service:memcache:2"]; !ok { + t.Fatalf("missing memcache:2 check ttl") + } + + // Ensure the notes are passed through + if agent.state.Checks()["service:memcache:1"].Notes == "" { + t.Fatalf("missing redis check notes") + } + if agent.state.Checks()["service:memcache:2"].Notes == "" { + t.Fatalf("missing redis check notes") + } } } @@ -190,34 +247,67 @@ func TestAgent_RemoveService(t *testing.T) { t.Fatalf("should have errored") } - srv := &structs.NodeService{ - ID: "redis", - Service: "redis", - Port: 8000, - } - chk := &CheckType{TTL: time.Minute} - if err := agent.AddService(srv, chk, false); err != nil { - t.Fatalf("err: %v", err) - } + // Removing a service with a single check works + { + srv := &structs.NodeService{ + ID: "memcache", + Service: "memcache", + Port: 8000, + } + chkTypes := CheckTypes{&CheckType{TTL: time.Minute}} - // Remove the service - if err := agent.RemoveService("redis", false); err != nil { - t.Fatalf("err: %v", err) - } + if err := agent.AddService(srv, chkTypes, false); err != nil { + t.Fatalf("err: %v", err) + } - // Ensure we have a state mapping - if _, ok := agent.state.Services()["redis"]; ok { - t.Fatalf("have redis service") + if err := agent.RemoveService("memcache", false); err != nil { + t.Fatalf("err: %s", err) + } + if _, ok := agent.state.Checks()["service:memcache"]; ok { + t.Fatalf("have memcache check") + } } - // Ensure we have a check mapping - if _, ok := agent.state.Checks()["service:redis"]; ok { - t.Fatalf("have redis check") - } + // Removing a service with multiple checks works + { + srv := &structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + } + chkTypes := CheckTypes{ + &CheckType{TTL: time.Minute}, + &CheckType{TTL: 30 * time.Second}, + } + if err := agent.AddService(srv, chkTypes, false); err != nil { + t.Fatalf("err: %v", err) + } - // Ensure a TTL is setup - if _, ok := agent.checkTTLs["service:redis"]; ok { - t.Fatalf("have redis check ttl") + // Remove the service + if err := agent.RemoveService("redis", false); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a state mapping + if _, ok := agent.state.Services()["redis"]; ok { + t.Fatalf("have redis service") + } + + // Ensure checks were removed + if _, ok := agent.state.Checks()["service:redis:1"]; ok { + t.Fatalf("check redis:1 should be removed") + } + if _, ok := agent.state.Checks()["service:redis:2"]; ok { + t.Fatalf("check redis:2 should be removed") + } + + // Ensure a TTL is setup + if _, ok := agent.checkTTLs["service:redis:1"]; ok { + t.Fatalf("check ttl for redis:1 should be removed") + } + if _, ok := agent.checkTTLs["service:redis:2"]; ok { + t.Fatalf("check ttl for redis:2 should be removed") + } } } @@ -285,6 +375,27 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) { } } +func TestAgent_AddCheck_MissingService(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "baz", + Name: "baz check 1", + ServiceID: "baz", + } + chk := &CheckType{ + Script: "exit 0", + Interval: time.Microsecond, + } + err := agent.AddCheck(health, chk, false) + if err == nil || err.Error() != `ServiceID "baz" does not exist` { + t.Fatalf("expected service id error, got: %v", err) + } +} + func TestAgent_RemoveCheck(t *testing.T) { dir, agent := makeAgent(t, nextConfig()) defer os.RemoveAll(dir) @@ -534,12 +645,10 @@ func TestAgent_PersistCheck(t *testing.T) { defer agent.Shutdown() check := &structs.HealthCheck{ - Node: config.NodeName, - CheckID: "service:redis1", - Name: "redischeck", - Status: structs.HealthPassing, - ServiceID: "redis", - ServiceName: "redis", + Node: config.NodeName, + CheckID: "mem", + Name: "memory check", + Status: structs.HealthPassing, } chkType := &CheckType{ Script: "/bin/true", @@ -607,12 +716,10 @@ func TestAgent_PurgeCheck(t *testing.T) { defer agent.Shutdown() check := &structs.HealthCheck{ - Node: config.NodeName, - CheckID: "service:redis1", - Name: "redischeck", - Status: structs.HealthPassing, - ServiceID: "redis", - ServiceName: "redis", + Node: config.NodeName, + CheckID: "mem", + Name: "memory check", + Status: structs.HealthPassing, } file := filepath.Join(agent.config.DataDir, checksDir, stringHash(check.CheckID)) @@ -645,12 +752,10 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { defer agent.Shutdown() check1 := &structs.HealthCheck{ - Node: config.NodeName, - CheckID: "service:redis1", - Name: "redischeck", - Status: structs.HealthPassing, - ServiceID: "redis", - ServiceName: "redis", + Node: config.NodeName, + CheckID: "mem", + Name: "memory check", + Status: structs.HealthPassing, } // First persist the check @@ -661,8 +766,8 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { // Start again with the check registered in config check2 := &CheckDefinition{ - ID: "service:redis1", - Name: "redischeck", + ID: "mem", + Name: "memory check", Notes: "my cool notes", CheckType: CheckType{ Script: "/bin/check-redis.py", @@ -697,16 +802,26 @@ func TestAgent_unloadChecks(t *testing.T) { defer os.RemoveAll(dir) defer agent.Shutdown() + // First register a service + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + if err := agent.AddService(svc, nil, false); err != nil { + t.Fatalf("err: %v", err) + } + + // Register a check check1 := &structs.HealthCheck{ Node: config.NodeName, - CheckID: "service:redis1", + CheckID: "service:redis", Name: "redischeck", Status: structs.HealthPassing, ServiceID: "redis", ServiceName: "redis", } - - // Register the check if err := agent.AddCheck(check1, nil, false); err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/check.go b/command/agent/check.go index 7cb6228f6f79..71aa0eba0ec1 100644 --- a/command/agent/check.go +++ b/command/agent/check.go @@ -40,6 +40,7 @@ type CheckType struct { Notes string } +type CheckTypes []*CheckType // Valid checks if the CheckType is valid func (c *CheckType) Valid() bool { diff --git a/command/agent/config.go b/command/agent/config.go index 92b9e64d3794..2e3ea2eedc84 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -568,7 +568,6 @@ func DecodeConfig(r io.Reader) (*Config, error) { // DecodeServiceDefinition is used to decode a service definition func DecodeServiceDefinition(raw interface{}) (*ServiceDefinition, error) { - var sub interface{} rawMap, ok := raw.(map[string]interface{}) if !ok { goto AFTER_FIX @@ -582,17 +581,23 @@ func DecodeServiceDefinition(raw interface{}) (*ServiceDefinition, error) { } for k, v := range rawMap { - if strings.ToLower(k) == "check" { - sub = v - break + switch strings.ToLower(k) { + case "check": + if err := FixupCheckType(v); err != nil { + return nil, err + } + case "checks": + chkTypes, ok := v.([]interface{}) + if !ok { + goto AFTER_FIX + } + for _, chkType := range chkTypes { + if err := FixupCheckType(chkType); err != nil { + return nil, err + } + } } } - if sub == nil { - goto AFTER_FIX - } - if err := FixupCheckType(sub); err != nil { - return nil, err - } AFTER_FIX: var md mapstructure.Metadata var result ServiceDefinition @@ -610,22 +615,23 @@ AFTER_FIX: } func FixupCheckType(raw interface{}) error { + var ttlKey, intervalKey string + // Handle decoding of time durations rawMap, ok := raw.(map[string]interface{}) if !ok { return nil } - var ttlKey string - for k, _ := range rawMap { - if strings.ToLower(k) == "ttl" { + for k, v := range rawMap { + switch strings.ToLower(k) { + case "ttl": ttlKey = k - } - } - var intervalKey string - for k, _ := range rawMap { - if strings.ToLower(k) == "interval" { + case "interval": intervalKey = k + case "service_id": + rawMap["serviceid"] = v + delete(rawMap, "service_id") } } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index fa7bf6f274f6..3647da2d83d7 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -640,7 +640,17 @@ func TestDecodeConfig_Services(t *testing.T) { "script": "/bin/check_redis -p 6000", "interval": "5s", "ttl": "20s" - } + }, + "checks": [ + { + "script": "/bin/check_redis_read", + "interval": "1m" + }, + { + "script": "/bin/check_redis_write", + "interval": "1m" + } + ] }, { "id": "red1", @@ -672,6 +682,16 @@ func TestDecodeConfig_Services(t *testing.T) { Script: "/bin/check_redis -p 6000", TTL: 20 * time.Second, }, + Checks: CheckTypes{ + &CheckType{ + Interval: time.Minute, + Script: "/bin/check_redis_read", + }, + &CheckType{ + Interval: time.Minute, + Script: "/bin/check_redis_write", + }, + }, ID: "red0", Name: "redis", Tags: []string{ @@ -715,6 +735,13 @@ func TestDecodeConfig_Checks(t *testing.T) { "name": "cpu", "script": "/bin/check_cpu", "interval": "10s" + }, + { + "id": "chk3", + "name": "service:redis:tx", + "script": "/bin/check_redis_tx", + "interval": "1m", + "service_id": "redis" } ] }` @@ -742,6 +769,15 @@ func TestDecodeConfig_Checks(t *testing.T) { Interval: 10 * time.Second, }, }, + &CheckDefinition{ + ID: "chk3", + Name: "service:redis:tx", + ServiceID: "redis", + CheckType: CheckType{ + Script: "/bin/check_redis_tx", + Interval: time.Minute, + }, + }, }, } diff --git a/command/agent/local.go b/command/agent/local.go index 822546539dfd..84c5ae0428d2 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -368,41 +368,48 @@ func (l *localState) syncChanges() error { l.Lock() defer l.Unlock() - // Sync the services - for id, status := range l.serviceStatus { + // Sync the checks first. This allows registering the service in the + // same transaction as its checks. + var checkIDs []string + for id, status := range l.checkStatus { if status.remoteDelete { - if err := l.deleteService(id); err != nil { + if err := l.deleteCheck(id); err != nil { return err } } else if !status.inSync { - if err := l.syncService(id); err != nil { - return err + // Cancel a deferred sync + if timer, ok := l.deferCheck[id]; ok { + timer.Stop() + delete(l.deferCheck, id) } + + checkIDs = append(checkIDs, id) } else { - l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id) + l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) + } + + if len(checkIDs) > 0 { + if err := l.syncChecks(checkIDs); err != nil { + return err + } } } - // Sync the checks - for id, status := range l.checkStatus { + // Sync any remaining services. + for id, status := range l.serviceStatus { if status.remoteDelete { - if err := l.deleteCheck(id); err != nil { + if err := l.deleteService(id); err != nil { return err } } else if !status.inSync { - // Cancel a deferred sync - if timer := l.deferCheck[id]; timer != nil { - timer.Stop() - delete(l.deferCheck, id) - } - - if err := l.syncCheck(id); err != nil { + if err := l.syncService(id); err != nil { return err } } else { - l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) + l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id) } } + return nil } @@ -462,33 +469,72 @@ func (l *localState) syncService(id string) error { return err } -// syncCheck is used to sync a service to the server -func (l *localState) syncCheck(id string) error { - // Pull in the associated service if any - check := l.checks[id] - var service *structs.NodeService - if check.ServiceID != "" { - if serv, ok := l.services[check.ServiceID]; ok { - service = serv +// syncChecks is used to sync checks to the server +func (l *localState) syncChecks(checkIDs []string) error { + reqs := make(map[string]*structs.RegisterRequest) + + for _, id := range checkIDs { + if check, ok := l.checks[id]; ok { + // Add checks to the base request if it already exists + if req, ok := reqs[check.ServiceID]; ok { + req.Checks = append(req.Checks, check) + continue + } + + // Pull in the associated service if any + var service *structs.NodeService + if serv, ok := l.services[check.ServiceID]; ok { + service = serv + } + + // Create the base request + reqs[check.ServiceID] = &structs.RegisterRequest{ + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + Address: l.config.AdvertiseAddr, + Service: service, + Checks: structs.HealthChecks{check}, + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, + } } } - req := structs.RegisterRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, - Address: l.config.AdvertiseAddr, - Service: service, - Check: l.checks[id], - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, - } - var out struct{} - err := l.iface.RPC("Catalog.Register", &req, &out) - if err == nil { - l.checkStatus[id] = syncStatus{inSync: true} - l.logger.Printf("[INFO] agent: Synced check '%s'", id) - } else if strings.Contains(err.Error(), permissionDenied) { - l.checkStatus[id] = syncStatus{inSync: true} - l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) - return nil + + for _, req := range reqs { + // Send check data as Check for backward compatibility if we only have a + // single check. Otherwise, send it as Checks + if len(req.Checks) == 1 { + req.Check = req.Checks[0] + req.Checks = nil + } + + // Perform the sync + var out struct{} + err := l.iface.RPC("Catalog.Register", &req, &out) + if err == nil { + for _, id := range checkIDs { + l.checkStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[INFO] agent: Synced check '%s'", id) + } + + // If the check was associated with a service and we synced it, + // then mark the service as in sync. + if svc := req.Service; svc != nil { + if status, ok := l.serviceStatus[svc.ID]; ok && status.inSync { + continue + } + l.serviceStatus[svc.ID] = syncStatus{inSync: true} + l.logger.Printf("[INFO] agent: Synced service '%s'", svc.ID) + } + } else if strings.Contains(err.Error(), permissionDenied) { + for _, id := range checkIDs { + l.checkStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) + } + return nil + } else { + return err + } } - return err + + return nil } diff --git a/command/agent/structs.go b/command/agent/structs.go index 86629e543f7d..ebb2ddea7321 100644 --- a/command/agent/structs.go +++ b/command/agent/structs.go @@ -12,6 +12,7 @@ type ServiceDefinition struct { Address string Port int Check CheckType + Checks CheckTypes } func (s *ServiceDefinition) NodeService() *structs.NodeService { @@ -28,11 +29,14 @@ func (s *ServiceDefinition) NodeService() *structs.NodeService { return ns } -func (s *ServiceDefinition) CheckType() *CheckType { - if s.Check.Script == "" && s.Check.Interval == 0 && s.Check.TTL == 0 { - return nil +func (s *ServiceDefinition) CheckTypes() (checks CheckTypes) { + s.Checks = append(s.Checks, &s.Check) + for _, check := range s.Checks { + if (check.Script != "" && check.Interval != 0) || check.TTL != 0 { + checks = append(checks, check) + } } - return &s.Check + return } // ChecKDefinition is used to JSON decode the Check definitions @@ -40,16 +44,18 @@ type CheckDefinition struct { ID string Name string Notes string + ServiceID string CheckType `mapstructure:",squash"` } func (c *CheckDefinition) HealthCheck(node string) *structs.HealthCheck { health := &structs.HealthCheck{ - Node: node, - CheckID: c.ID, - Name: c.Name, - Status: structs.HealthCritical, - Notes: c.Notes, + Node: node, + CheckID: c.ID, + Name: c.Name, + Status: structs.HealthCritical, + Notes: c.Notes, + ServiceID: c.ServiceID, } if health.CheckID == "" && health.Name != "" { health.CheckID = health.Name diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 24995fdeb4db..b39c19f2bd7a 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -53,11 +53,15 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error } if args.Check != nil { - if args.Check.CheckID == "" && args.Check.Name != "" { - args.Check.CheckID = args.Check.Name + args.Checks = append(args.Checks, args.Check) + args.Check = nil + } + for _, check := range args.Checks { + if check.CheckID == "" && check.Name != "" { + check.CheckID = check.Name } - if args.Check.Node == "" { - args.Check.Node = args.Node + if check.Node == "" { + check.Node = args.Node } } @@ -66,6 +70,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error c.srv.logger.Printf("[ERR] consul.catalog: Register failed: %v", err) return err } + return nil } diff --git a/consul/state_store.go b/consul/state_store.go index c65152cf699a..d8783bf18912 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -497,12 +497,17 @@ func (s *StateStore) EnsureRegistration(index uint64, req *structs.RegisterReque } } - // Ensure the check if provided + // Ensure the check(s), if provided if req.Check != nil { if err := s.ensureCheckTxn(index, req.Check, tx); err != nil { return err } } + for _, check := range req.Checks { + if err := s.ensureCheckTxn(index, check, tx); err != nil { + return err + } + } // Commit as one unit return tx.Commit() diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 589f7b3781c3..1c605c8756a4 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -32,6 +32,15 @@ func TestEnsureRegistration(t *testing.T) { Status: structs.HealthPassing, ServiceID: "api", }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "foo", + CheckID: "api-cache", + Name: "Can cache stuff", + Status: structs.HealthPassing, + ServiceID: "api", + }, + }, } if err := store.EnsureRegistration(13, reg); err != nil { @@ -60,7 +69,7 @@ func TestEnsureRegistration(t *testing.T) { if idx != 13 { t.Fatalf("bad: %v", idx) } - if len(checks) != 1 { + if len(checks) != 2 { t.Fatalf("check: %#v", checks) } } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 0d4778de73eb..9ebaefb33fcf 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -137,6 +137,7 @@ type RegisterRequest struct { Address string Service *NodeService Check *HealthCheck + Checks HealthChecks WriteRequest } diff --git a/website/source/docs/agent/checks.html.markdown b/website/source/docs/agent/checks.html.markdown index 2102ebb607b9..0f84c32e5f23 100644 --- a/website/source/docs/agent/checks.html.markdown +++ b/website/source/docs/agent/checks.html.markdown @@ -104,6 +104,28 @@ This is the only convention that Consul depends on. Any output of the script will be captured and stored in the `notes` field so that it can be viewed by human operators. +## Service-bound checks + +Health checks may also be optionally bound to a specific service. This ensures +that the status of the health check will only affect the health status of the +given service instead of the entire node. Service-bound health checks may be +provided by adding a `service_id` field to a check configuration: + +```javascript +{ + "check": { + "id": "web-app", + "name": "Web App Status", + "service_id": "web-app", + "ttl": "30s" + } +} +``` + +In the above configuration, if the web-app health check begins failing, it will +only affect the availability of the web-app service and no other services +provided by the node. + ## Multiple Check Definitions Multiple check definitions can be provided at once using the `checks` (plural) diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index cdb4f9db3417..700984a7a5fc 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -469,6 +469,21 @@ An `HTTP` check will preform an HTTP GET request to the value of `HTTP` (expecte If a `TTL` type is used, then the TTL update APIs must be used to periodically update the state of the check. +It is also possible to associate a new check with an existing service registered +on the agent by providing an additional `ServiceID` field. This type of request +must look like: + +```javascript +{ + "ID": "service:redis:tx", + "ServiceID": "redis", + "Name": "Redis test transaction", + "Notes": "Tests Redis SET, GET, and DELETE", + "Script": "/usr/local/bin/check_redis_tx.py", + "Interval": "1m" +} +``` + The return code is 200 on success. ### /v1/agent/check/deregister/\ diff --git a/website/source/docs/agent/services.html.markdown b/website/source/docs/agent/services.html.markdown index 230de7594a1e..95572986de9d 100644 --- a/website/source/docs/agent/services.html.markdown +++ b/website/source/docs/agent/services.html.markdown @@ -26,10 +26,12 @@ A service definition that is a script looks like: "tags": ["master"], "address": "127.0.0.1", "port": 8000, - "check": { - "script": "/usr/local/bin/check_redis.py", - "interval": "10s" - } + "checks": [ + { + "script": "/usr/local/bin/check_redis.py", + "interval": "10s" + } + ] } } ``` @@ -58,7 +60,10 @@ There is more information about [checks here](/docs/agent/checks.html). The check must be of the script, HTTP or TTL type. If it is a script type, `script` and `interval` must be provided. If it is a HTTP type, `http` and `interval` must be provided. If it is a TTL type, then only `ttl` must be -provided. The check name is automatically generated as "service:". +provided. The check name is automatically generated as +`service:`. If there are multiple service checks registered, the +ID will be generated as `service::`, where `` is an +incrementing number starting from `1`. To configure a service, either provide it as a `-config-file` option to the agent, or place it inside the `-config-dir` of the agent. The file must @@ -82,11 +87,13 @@ Multiple services definitions can be provided at once using the `services` ], "address": "127.0.0.1", "port": 6000, - "check": { - "script": "/bin/check_redis -p 6000", - "interval": "5s", - "ttl": "20s" - } + "checks": [ + { + "script": "/bin/check_redis -p 6000", + "interval": "5s", + "ttl": "20s" + } + ] }, { "id": "red1", @@ -97,11 +104,13 @@ Multiple services definitions can be provided at once using the `services` ], "address": "127.0.0.1", "port": 7000, - "check": { - "script": "/bin/check_redis -p 7000", - "interval": "30s", - "ttl": "60s" - } + "checks": [ + { + "script": "/bin/check_redis -p 7000", + "interval": "30s", + "ttl": "60s" + } + ] }, ... ] From 2a7211cd5d109c8231f04bb76f21b321610e1d92 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 13 Jan 2015 19:08:30 -0800 Subject: [PATCH 2/7] agent: support adding multiple checks during service registration from the API --- command/agent/agent_endpoint.go | 24 ++++++++++++++++-------- command/agent/agent_endpoint_test.go | 17 +++++++++++++---- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index c78c2baa34be..91dd4217d6ef 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -132,17 +132,25 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re return nil } - var check interface{} for k, v := range rawMap { - if strings.ToLower(k) == "check" { - check = v + switch strings.ToLower(k) { + case "check": + if err := FixupCheckType(v); err != nil { + return err + } + case "checks": + chkTypes, ok := v.([]interface{}) + if !ok { + return nil + } + for _, chkType := range chkTypes { + if err := FixupCheckType(chkType); err != nil { + return err + } + } } } - if check == nil { - return nil - } - - return FixupCheckType(check) + return nil } if err := decodeBody(req, &args, decodeCB); err != nil { resp.WriteHeader(400) diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index f49ab171471c..189b90a3c1cc 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -430,6 +430,14 @@ func TestHTTPAgentRegisterService(t *testing.T) { Check: CheckType{ TTL: 15 * time.Second, }, + Checks: CheckTypes{ + &CheckType{ + TTL: 20 * time.Second, + }, + &CheckType{ + TTL: 30 * time.Second, + }, + }, } req.Body = encodeReq(args) @@ -447,12 +455,13 @@ func TestHTTPAgentRegisterService(t *testing.T) { } // Ensure we have a check mapping - if _, ok := srv.agent.state.Checks()["service:test"]; !ok { - t.Fatalf("missing test check") + checks := srv.agent.state.Checks() + if len(checks) != 3 { + t.Fatalf("bad: %v", checks) } - if _, ok := srv.agent.checkTTLs["service:test"]; !ok { - t.Fatalf("missing test check ttl") + if len(srv.agent.checkTTLs) != 3 { + t.Fatalf("missing test check ttls: %v", srv.agent.checkTTLs) } } From 197a5a9a9abcca189fd36df3a9fc21791daf3760 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 13 Jan 2015 19:18:46 -0800 Subject: [PATCH 3/7] api: support multiple checks during service/check registration --- api/agent.go | 19 ++++++----- api/agent_test.go | 81 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 8 deletions(-) diff --git a/api/agent.go b/api/agent.go index c31395e1891b..b71f2c32d7a9 100644 --- a/api/agent.go +++ b/api/agent.go @@ -41,18 +41,20 @@ type AgentMember struct { // AgentServiceRegistration is used to register a new service type AgentServiceRegistration struct { - ID string `json:",omitempty"` - Name string `json:",omitempty"` - Tags []string `json:",omitempty"` - Port int `json:",omitempty"` - Check *AgentServiceCheck + ID string `json:",omitempty"` + Name string `json:",omitempty"` + Tags []string `json:",omitempty"` + Port int `json:",omitempty"` + Check *AgentServiceCheck + Checks AgentServiceChecks } // AgentCheckRegistration is used to register a new check type AgentCheckRegistration struct { - ID string `json:",omitempty"` - Name string `json:",omitempty"` - Notes string `json:",omitempty"` + ID string `json:",omitempty"` + Name string `json:",omitempty"` + Notes string `json:",omitempty"` + ServiceID string `json:",omitempty"` AgentServiceCheck } @@ -63,6 +65,7 @@ type AgentServiceCheck struct { Interval string `json:",omitempty"` TTL string `json:",omitempty"` } +type AgentServiceChecks []*AgentServiceCheck // Agent can be used to query the Agent endpoints type Agent struct { diff --git a/api/agent_test.go b/api/agent_test.go index 06f8ca12e1ed..627256c23057 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -76,6 +76,49 @@ func TestAgent_Services(t *testing.T) { } } +func TestAgent_Services_MultipleChecks(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + agent := c.Agent() + + reg := &AgentServiceRegistration{ + Name: "foo", + Tags: []string{"bar", "baz"}, + Port: 8000, + Checks: AgentServiceChecks{ + &AgentServiceCheck{ + TTL: "15s", + }, + &AgentServiceCheck{ + TTL: "30s", + }, + }, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + services, err := agent.Services() + if err != nil { + t.Fatalf("err: %v", err) + } + if _, ok := services["foo"]; !ok { + t.Fatalf("missing service: %v", services) + } + + checks, err := agent.Checks() + if err != nil { + t.Fatalf("err: %v", err) + } + if _, ok := checks["service:foo:1"]; !ok { + t.Fatalf("missing check: %v", checks) + } + if _, ok := checks["service:foo:2"]; !ok { + t.Fatalf("missing check: %v", checks) + } +} + func TestAgent_SetTTLStatus(t *testing.T) { c, s := makeClient(t) defer s.stop() @@ -143,6 +186,44 @@ func TestAgent_Checks(t *testing.T) { } } +func TestAgent_Checks_serviceBound(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + agent := c.Agent() + + // First register a service + serviceReg := &AgentServiceRegistration{ + Name: "redis", + } + if err := agent.ServiceRegister(serviceReg); err != nil { + t.Fatalf("err: %v", err) + } + + // Register a check bound to the service + reg := &AgentCheckRegistration{ + Name: "redischeck", + ServiceID: "redis", + } + reg.TTL = "15s" + if err := agent.CheckRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + checks, err := agent.Checks() + if err != nil { + t.Fatalf("err: %v", err) + } + + check, ok := checks["redischeck"] + if !ok { + t.Fatalf("missing check: %v", checks) + } + if check.ServiceID != "redis" { + t.Fatalf("missing service association for check: %v", check) + } +} + func TestAgent_Join(t *testing.T) { c, s := makeClient(t) defer s.stop() From 949ddefbc8332c52b132be1e2de54cc77a3e89c1 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 13 Jan 2015 23:23:52 -0800 Subject: [PATCH 4/7] agent: refactor syncChecks --- command/agent/local.go | 96 +++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 53 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index 84c5ae0428d2..266a924d8974 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -387,11 +387,10 @@ func (l *localState) syncChanges() error { } else { l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) } - - if len(checkIDs) > 0 { - if err := l.syncChecks(checkIDs); err != nil { - return err - } + } + if len(checkIDs) > 0 { + if err := l.syncChecks(checkIDs); err != nil { + return err } } @@ -471,69 +470,60 @@ func (l *localState) syncService(id string) error { // syncChecks is used to sync checks to the server func (l *localState) syncChecks(checkIDs []string) error { - reqs := make(map[string]*structs.RegisterRequest) + checkMap := make(map[string]structs.HealthChecks) for _, id := range checkIDs { if check, ok := l.checks[id]; ok { - // Add checks to the base request if it already exists - if req, ok := reqs[check.ServiceID]; ok { - req.Checks = append(req.Checks, check) - continue - } + checkMap[check.ServiceID] = append(checkMap[check.ServiceID], check) + } + } - // Pull in the associated service if any - var service *structs.NodeService - if serv, ok := l.services[check.ServiceID]; ok { - service = serv - } + for serviceID, checks := range checkMap { + service := l.services[serviceID] - // Create the base request - reqs[check.ServiceID] = &structs.RegisterRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, - Address: l.config.AdvertiseAddr, - Service: service, - Checks: structs.HealthChecks{check}, - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, - } + // Create the sync request + req := structs.RegisterRequest{ + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + Address: l.config.AdvertiseAddr, + Service: service, + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } - } - for _, req := range reqs { - // Send check data as Check for backward compatibility if we only have a - // single check. Otherwise, send it as Checks - if len(req.Checks) == 1 { - req.Check = req.Checks[0] - req.Checks = nil + // Send single Check element for backwards compat with 0.4.x + if len(checks) == 1 { + req.Check = checks[0] + } else { + req.Checks = checks } // Perform the sync var out struct{} - err := l.iface.RPC("Catalog.Register", &req, &out) - if err == nil { - for _, id := range checkIDs { - l.checkStatus[id] = syncStatus{inSync: true} - l.logger.Printf("[INFO] agent: Synced check '%s'", id) - } - - // If the check was associated with a service and we synced it, - // then mark the service as in sync. - if svc := req.Service; svc != nil { - if status, ok := l.serviceStatus[svc.ID]; ok && status.inSync { - continue + if err := l.iface.RPC("Catalog.Register", &req, &out); err != nil { + if strings.Contains(err.Error(), permissionDenied) { + for _, check := range checks { + l.checkStatus[check.CheckID] = syncStatus{inSync: true} + l.logger.Printf( + "[WARN] agent: Check '%s' registration blocked by ACLs", + check.CheckID) } - l.serviceStatus[svc.ID] = syncStatus{inSync: true} - l.logger.Printf("[INFO] agent: Synced service '%s'", svc.ID) - } - } else if strings.Contains(err.Error(), permissionDenied) { - for _, id := range checkIDs { - l.checkStatus[id] = syncStatus{inSync: true} - l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) + return nil } - return nil - } else { return err } + + // Mark the checks and services as synced + for _, check := range checks { + l.checkStatus[check.CheckID] = syncStatus{inSync: true} + l.logger.Printf("[INFO] agent: Synced check '%s'", check.CheckID) + } + if service != nil { + if status, ok := l.serviceStatus[serviceID]; ok && status.inSync { + continue + } + l.serviceStatus[serviceID] = syncStatus{inSync: true} + l.logger.Printf("[INFO] agent: Synced service '%s'", serviceID) + } } return nil From 0c31e5851c984467a023a70e48592bce67b808e9 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Wed, 14 Jan 2015 09:56:36 -0800 Subject: [PATCH 5/7] agent: only send service with check sync if it is out of sync --- command/agent/local.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index 266a924d8974..08257e191fea 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -468,7 +468,9 @@ func (l *localState) syncService(id string) error { return err } -// syncChecks is used to sync checks to the server +// syncChecks is used to sync checks to the server. If a check is associated +// with a service and the service is out of sync, it will piggyback with the +// sync so that it is updated as part of the same transaction. func (l *localState) syncChecks(checkIDs []string) error { checkMap := make(map[string]structs.HealthChecks) @@ -479,17 +481,21 @@ func (l *localState) syncChecks(checkIDs []string) error { } for serviceID, checks := range checkMap { - service := l.services[serviceID] - // Create the sync request req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, - Service: service, WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } + // Attach the service if it should also be synced + if service, ok := l.services[serviceID]; ok { + if status, ok := l.serviceStatus[serviceID]; ok && !status.inSync { + req.Service = service + } + } + // Send single Check element for backwards compat with 0.4.x if len(checks) == 1 { req.Check = checks[0] @@ -513,17 +519,14 @@ func (l *localState) syncChecks(checkIDs []string) error { } // Mark the checks and services as synced + if req.Service != nil { + l.serviceStatus[serviceID] = syncStatus{inSync: true} + l.logger.Printf("[INFO] agent: Synced service '%s'", serviceID) + } for _, check := range checks { l.checkStatus[check.CheckID] = syncStatus{inSync: true} l.logger.Printf("[INFO] agent: Synced check '%s'", check.CheckID) } - if service != nil { - if status, ok := l.serviceStatus[serviceID]; ok && status.inSync { - continue - } - l.serviceStatus[serviceID] = syncStatus{inSync: true} - l.logger.Printf("[INFO] agent: Synced service '%s'", serviceID) - } } return nil From a4039aaa4d4f0e4eceb660beacc18a7bc654754d Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Wed, 14 Jan 2015 11:48:36 -0800 Subject: [PATCH 6/7] agent: simplify anti-entropy of services with multiple checks, add tests --- command/agent/local.go | 147 ++++++++++++++++-------------------- command/agent/local_test.go | 120 +++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 82 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index 08257e191fea..b5952ee16e4c 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -368,47 +368,41 @@ func (l *localState) syncChanges() error { l.Lock() defer l.Unlock() - // Sync the checks first. This allows registering the service in the - // same transaction as its checks. - var checkIDs []string - for id, status := range l.checkStatus { + // Sync the services + for id, status := range l.serviceStatus { if status.remoteDelete { - if err := l.deleteCheck(id); err != nil { + if err := l.deleteService(id); err != nil { return err } } else if !status.inSync { - // Cancel a deferred sync - if timer, ok := l.deferCheck[id]; ok { - timer.Stop() - delete(l.deferCheck, id) + if err := l.syncService(id); err != nil { + return err } - - checkIDs = append(checkIDs, id) } else { - l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) - } - } - if len(checkIDs) > 0 { - if err := l.syncChecks(checkIDs); err != nil { - return err + l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id) } } - // Sync any remaining services. - for id, status := range l.serviceStatus { + // Sync the checks + for id, status := range l.checkStatus { if status.remoteDelete { - if err := l.deleteService(id); err != nil { + if err := l.deleteCheck(id); err != nil { return err } } else if !status.inSync { - if err := l.syncService(id); err != nil { + // Cancel a deferred sync + if timer := l.deferCheck[id]; timer != nil { + timer.Stop() + delete(l.deferCheck, id) + } + + if err := l.syncCheck(id); err != nil { return err } } else { - l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id) + l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) } } - return nil } @@ -455,79 +449,68 @@ func (l *localState) syncService(id string) error { Service: l.services[id], WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } + + var checks structs.HealthChecks + for _, check := range l.checks { + if check.ServiceID == id { + if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync { + checks = append(checks, check) + } + } + } + + if len(checks) == 1 { + req.Check = checks[0] + } else { + req.Checks = checks + } + var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out) if err == nil { l.serviceStatus[id] = syncStatus{inSync: true} l.logger.Printf("[INFO] agent: Synced service '%s'", id) + for _, check := range checks { + l.checkStatus[check.CheckID] = syncStatus{inSync: true} + } } else if strings.Contains(err.Error(), permissionDenied) { l.serviceStatus[id] = syncStatus{inSync: true} l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id) + for _, check := range checks { + l.checkStatus[check.CheckID] = syncStatus{inSync: true} + } return nil } return err } -// syncChecks is used to sync checks to the server. If a check is associated -// with a service and the service is out of sync, it will piggyback with the -// sync so that it is updated as part of the same transaction. -func (l *localState) syncChecks(checkIDs []string) error { - checkMap := make(map[string]structs.HealthChecks) - - for _, id := range checkIDs { - if check, ok := l.checks[id]; ok { - checkMap[check.ServiceID] = append(checkMap[check.ServiceID], check) +// syncCheck is used to sync a service to the server +func (l *localState) syncCheck(id string) error { + // Pull in the associated service if any + check := l.checks[id] + var service *structs.NodeService + if check.ServiceID != "" { + if serv, ok := l.services[check.ServiceID]; ok { + service = serv } } - - for serviceID, checks := range checkMap { - // Create the sync request - req := structs.RegisterRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, - Address: l.config.AdvertiseAddr, - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, - } - - // Attach the service if it should also be synced - if service, ok := l.services[serviceID]; ok { - if status, ok := l.serviceStatus[serviceID]; ok && !status.inSync { - req.Service = service - } - } - - // Send single Check element for backwards compat with 0.4.x - if len(checks) == 1 { - req.Check = checks[0] - } else { - req.Checks = checks - } - - // Perform the sync - var out struct{} - if err := l.iface.RPC("Catalog.Register", &req, &out); err != nil { - if strings.Contains(err.Error(), permissionDenied) { - for _, check := range checks { - l.checkStatus[check.CheckID] = syncStatus{inSync: true} - l.logger.Printf( - "[WARN] agent: Check '%s' registration blocked by ACLs", - check.CheckID) - } - return nil - } - return err - } - - // Mark the checks and services as synced - if req.Service != nil { - l.serviceStatus[serviceID] = syncStatus{inSync: true} - l.logger.Printf("[INFO] agent: Synced service '%s'", serviceID) - } - for _, check := range checks { - l.checkStatus[check.CheckID] = syncStatus{inSync: true} - l.logger.Printf("[INFO] agent: Synced check '%s'", check.CheckID) - } + req := structs.RegisterRequest{ + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + Address: l.config.AdvertiseAddr, + Service: service, + Check: l.checks[id], + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } - - return nil + var out struct{} + err := l.iface.RPC("Catalog.Register", &req, &out) + if err == nil { + l.checkStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[INFO] agent: Synced check '%s'", id) + } else if strings.Contains(err.Error(), permissionDenied) { + l.checkStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) + return nil + } + return err } diff --git a/command/agent/local_test.go b/command/agent/local_test.go index c92aff446836..b650fef29f38 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -155,6 +155,126 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } +func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { + conf := nextConfig() + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + testutil.WaitForLeader(t, agent.RPC, "dc1") + + { + // Single check + srv := &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Tags: []string{"master"}, + Port: 5000, + } + agent.state.AddService(srv) + + chk := &structs.HealthCheck{ + Node: agent.config.NodeName, + CheckID: "mysql", + Name: "mysql", + ServiceID: "mysql", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk) + + // Sync the service once + if err := agent.state.syncService("mysql"); err != nil { + t.Fatalf("err: %s", err) + } + + // We should have 2 services (consul included) + svcReq := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &svcReq, &services); err != nil { + t.Fatalf("err: %v", err) + } + if len(services.NodeServices.Services) != 2 { + t.Fatalf("bad: %v", services.NodeServices.Services) + } + + // We should have one health check + chkReq := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "mysql", + } + var checks structs.IndexedHealthChecks + if err := agent.RPC("Health.ServiceChecks", &chkReq, &checks); err != nil { + t.Fatalf("err: %v", err) + } + if len(checks.HealthChecks) != 1 { + t.Fatalf("bad: %v", checks) + } + } + + { + // Multiple checks + srv := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"master"}, + Port: 5000, + } + agent.state.AddService(srv) + + chk1 := &structs.HealthCheck{ + Node: agent.config.NodeName, + CheckID: "redis:1", + Name: "redis:1", + ServiceID: "redis", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk1) + + chk2 := &structs.HealthCheck{ + Node: agent.config.NodeName, + CheckID: "redis:2", + Name: "redis:2", + ServiceID: "redis", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk2) + + // Sync the service once + if err := agent.state.syncService("redis"); err != nil { + t.Fatalf("err: %s", err) + } + + // We should have 3 services (consul included) + svcReq := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &svcReq, &services); err != nil { + t.Fatalf("err: %v", err) + } + if len(services.NodeServices.Services) != 3 { + t.Fatalf("bad: %v", services.NodeServices.Services) + } + + // We should have two health checks + chkReq := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "redis", + } + var checks structs.IndexedHealthChecks + if err := agent.RPC("Health.ServiceChecks", &chkReq, &checks); err != nil { + t.Fatalf("err: %v", err) + } + if len(checks.HealthChecks) != 2 { + t.Fatalf("bad: %v", checks) + } + } +} + func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { conf := nextConfig() conf.ACLDatacenter = "dc1" From 46d5dcfc17a590279836727ada54286b67aa606d Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Wed, 14 Jan 2015 23:09:42 -0800 Subject: [PATCH 7/7] agent: comments for new anti-entropy functionality --- command/agent/local.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/command/agent/local.go b/command/agent/local.go index b5952ee16e4c..1a394f4e9fdf 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -450,6 +450,9 @@ func (l *localState) syncService(id string) error { WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } + // If the service has associated checks that are out of sync, + // piggyback them on the service sync so they are part of the + // same transaction and are registered atomically. var checks structs.HealthChecks for _, check := range l.checks { if check.ServiceID == id { @@ -459,6 +462,7 @@ func (l *localState) syncService(id string) error { } } + // Backwards-compatibility for Consul < 0.5 if len(checks) == 1 { req.Check = checks[0] } else {