diff --git a/command/agent/local.go b/command/agent/local.go index 08257e191fea..b5952ee16e4c 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -368,47 +368,41 @@ func (l *localState) syncChanges() error { l.Lock() defer l.Unlock() - // Sync the checks first. This allows registering the service in the - // same transaction as its checks. - var checkIDs []string - for id, status := range l.checkStatus { + // Sync the services + for id, status := range l.serviceStatus { if status.remoteDelete { - if err := l.deleteCheck(id); err != nil { + if err := l.deleteService(id); err != nil { return err } } else if !status.inSync { - // Cancel a deferred sync - if timer, ok := l.deferCheck[id]; ok { - timer.Stop() - delete(l.deferCheck, id) + if err := l.syncService(id); err != nil { + return err } - - checkIDs = append(checkIDs, id) } else { - l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) - } - } - if len(checkIDs) > 0 { - if err := l.syncChecks(checkIDs); err != nil { - return err + l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id) } } - // Sync any remaining services. - for id, status := range l.serviceStatus { + // Sync the checks + for id, status := range l.checkStatus { if status.remoteDelete { - if err := l.deleteService(id); err != nil { + if err := l.deleteCheck(id); err != nil { return err } } else if !status.inSync { - if err := l.syncService(id); err != nil { + // Cancel a deferred sync + if timer := l.deferCheck[id]; timer != nil { + timer.Stop() + delete(l.deferCheck, id) + } + + if err := l.syncCheck(id); err != nil { return err } } else { - l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id) + l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id) } } - return nil } @@ -455,79 +449,68 @@ func (l *localState) syncService(id string) error { Service: l.services[id], WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } + + var checks structs.HealthChecks + for _, check := range l.checks { + if check.ServiceID == id { + if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync { + checks = append(checks, check) + } + } + } + + if len(checks) == 1 { + req.Check = checks[0] + } else { + req.Checks = checks + } + var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out) if err == nil { l.serviceStatus[id] = syncStatus{inSync: true} l.logger.Printf("[INFO] agent: Synced service '%s'", id) + for _, check := range checks { + l.checkStatus[check.CheckID] = syncStatus{inSync: true} + } } else if strings.Contains(err.Error(), permissionDenied) { l.serviceStatus[id] = syncStatus{inSync: true} l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id) + for _, check := range checks { + l.checkStatus[check.CheckID] = syncStatus{inSync: true} + } return nil } return err } -// syncChecks is used to sync checks to the server. If a check is associated -// with a service and the service is out of sync, it will piggyback with the -// sync so that it is updated as part of the same transaction. -func (l *localState) syncChecks(checkIDs []string) error { - checkMap := make(map[string]structs.HealthChecks) - - for _, id := range checkIDs { - if check, ok := l.checks[id]; ok { - checkMap[check.ServiceID] = append(checkMap[check.ServiceID], check) +// syncCheck is used to sync a service to the server +func (l *localState) syncCheck(id string) error { + // Pull in the associated service if any + check := l.checks[id] + var service *structs.NodeService + if check.ServiceID != "" { + if serv, ok := l.services[check.ServiceID]; ok { + service = serv } } - - for serviceID, checks := range checkMap { - // Create the sync request - req := structs.RegisterRequest{ - Datacenter: l.config.Datacenter, - Node: l.config.NodeName, - Address: l.config.AdvertiseAddr, - WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, - } - - // Attach the service if it should also be synced - if service, ok := l.services[serviceID]; ok { - if status, ok := l.serviceStatus[serviceID]; ok && !status.inSync { - req.Service = service - } - } - - // Send single Check element for backwards compat with 0.4.x - if len(checks) == 1 { - req.Check = checks[0] - } else { - req.Checks = checks - } - - // Perform the sync - var out struct{} - if err := l.iface.RPC("Catalog.Register", &req, &out); err != nil { - if strings.Contains(err.Error(), permissionDenied) { - for _, check := range checks { - l.checkStatus[check.CheckID] = syncStatus{inSync: true} - l.logger.Printf( - "[WARN] agent: Check '%s' registration blocked by ACLs", - check.CheckID) - } - return nil - } - return err - } - - // Mark the checks and services as synced - if req.Service != nil { - l.serviceStatus[serviceID] = syncStatus{inSync: true} - l.logger.Printf("[INFO] agent: Synced service '%s'", serviceID) - } - for _, check := range checks { - l.checkStatus[check.CheckID] = syncStatus{inSync: true} - l.logger.Printf("[INFO] agent: Synced check '%s'", check.CheckID) - } + req := structs.RegisterRequest{ + Datacenter: l.config.Datacenter, + Node: l.config.NodeName, + Address: l.config.AdvertiseAddr, + Service: service, + Check: l.checks[id], + WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, } - - return nil + var out struct{} + err := l.iface.RPC("Catalog.Register", &req, &out) + if err == nil { + l.checkStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[INFO] agent: Synced check '%s'", id) + } else if strings.Contains(err.Error(), permissionDenied) { + l.checkStatus[id] = syncStatus{inSync: true} + l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) + return nil + } + return err } diff --git a/command/agent/local_test.go b/command/agent/local_test.go index c92aff446836..b650fef29f38 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -155,6 +155,126 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } +func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { + conf := nextConfig() + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + testutil.WaitForLeader(t, agent.RPC, "dc1") + + { + // Single check + srv := &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Tags: []string{"master"}, + Port: 5000, + } + agent.state.AddService(srv) + + chk := &structs.HealthCheck{ + Node: agent.config.NodeName, + CheckID: "mysql", + Name: "mysql", + ServiceID: "mysql", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk) + + // Sync the service once + if err := agent.state.syncService("mysql"); err != nil { + t.Fatalf("err: %s", err) + } + + // We should have 2 services (consul included) + svcReq := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &svcReq, &services); err != nil { + t.Fatalf("err: %v", err) + } + if len(services.NodeServices.Services) != 2 { + t.Fatalf("bad: %v", services.NodeServices.Services) + } + + // We should have one health check + chkReq := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "mysql", + } + var checks structs.IndexedHealthChecks + if err := agent.RPC("Health.ServiceChecks", &chkReq, &checks); err != nil { + t.Fatalf("err: %v", err) + } + if len(checks.HealthChecks) != 1 { + t.Fatalf("bad: %v", checks) + } + } + + { + // Multiple checks + srv := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"master"}, + Port: 5000, + } + agent.state.AddService(srv) + + chk1 := &structs.HealthCheck{ + Node: agent.config.NodeName, + CheckID: "redis:1", + Name: "redis:1", + ServiceID: "redis", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk1) + + chk2 := &structs.HealthCheck{ + Node: agent.config.NodeName, + CheckID: "redis:2", + Name: "redis:2", + ServiceID: "redis", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk2) + + // Sync the service once + if err := agent.state.syncService("redis"); err != nil { + t.Fatalf("err: %s", err) + } + + // We should have 3 services (consul included) + svcReq := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + } + var services structs.IndexedNodeServices + if err := agent.RPC("Catalog.NodeServices", &svcReq, &services); err != nil { + t.Fatalf("err: %v", err) + } + if len(services.NodeServices.Services) != 3 { + t.Fatalf("bad: %v", services.NodeServices.Services) + } + + // We should have two health checks + chkReq := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "redis", + } + var checks structs.IndexedHealthChecks + if err := agent.RPC("Health.ServiceChecks", &chkReq, &checks); err != nil { + t.Fatalf("err: %v", err) + } + if len(checks.HealthChecks) != 2 { + t.Fatalf("bad: %v", checks) + } + } +} + func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { conf := nextConfig() conf.ACLDatacenter = "dc1"