Skip to content

Commit

Permalink
agent: simplify anti-entropy of services with multiple checks, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanuber committed Jan 15, 2015
1 parent ce77243 commit 1c64970
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 82 deletions.
147 changes: 65 additions & 82 deletions command/agent/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,47 +368,41 @@ func (l *localState) syncChanges() error {
l.Lock()
defer l.Unlock()

// Sync the checks first. This allows registering the service in the
// same transaction as its checks.
var checkIDs []string
for id, status := range l.checkStatus {
// Sync the services
for id, status := range l.serviceStatus {
if status.remoteDelete {
if err := l.deleteCheck(id); err != nil {
if err := l.deleteService(id); err != nil {
return err
}
} else if !status.inSync {
// Cancel a deferred sync
if timer, ok := l.deferCheck[id]; ok {
timer.Stop()
delete(l.deferCheck, id)
if err := l.syncService(id); err != nil {
return err
}

checkIDs = append(checkIDs, id)
} else {
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
}
}
if len(checkIDs) > 0 {
if err := l.syncChecks(checkIDs); err != nil {
return err
l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id)
}
}

// Sync any remaining services.
for id, status := range l.serviceStatus {
// Sync the checks
for id, status := range l.checkStatus {
if status.remoteDelete {
if err := l.deleteService(id); err != nil {
if err := l.deleteCheck(id); err != nil {
return err
}
} else if !status.inSync {
if err := l.syncService(id); err != nil {
// Cancel a deferred sync
if timer := l.deferCheck[id]; timer != nil {
timer.Stop()
delete(l.deferCheck, id)
}

if err := l.syncCheck(id); err != nil {
return err
}
} else {
l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id)
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
}
}

return nil
}

Expand Down Expand Up @@ -455,79 +449,68 @@ func (l *localState) syncService(id string) error {
Service: l.services[id],
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}

var checks structs.HealthChecks
for _, check := range l.checks {
if check.ServiceID == id {
if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync {
checks = append(checks, check)
}
}
}

if len(checks) == 1 {
req.Check = checks[0]
} else {
req.Checks = checks
}

var out struct{}
err := l.iface.RPC("Catalog.Register", &req, &out)
if err == nil {
l.serviceStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
}
} else if strings.Contains(err.Error(), permissionDenied) {
l.serviceStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
}
return nil
}
return err
}

// syncChecks is used to sync checks to the server. If a check is associated
// with a service and the service is out of sync, it will piggyback with the
// sync so that it is updated as part of the same transaction.
func (l *localState) syncChecks(checkIDs []string) error {
checkMap := make(map[string]structs.HealthChecks)

for _, id := range checkIDs {
if check, ok := l.checks[id]; ok {
checkMap[check.ServiceID] = append(checkMap[check.ServiceID], check)
// syncCheck is used to sync a service to the server
func (l *localState) syncCheck(id string) error {
// Pull in the associated service if any
check := l.checks[id]
var service *structs.NodeService
if check.ServiceID != "" {
if serv, ok := l.services[check.ServiceID]; ok {
service = serv
}
}

for serviceID, checks := range checkMap {
// Create the sync request
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}

// Attach the service if it should also be synced
if service, ok := l.services[serviceID]; ok {
if status, ok := l.serviceStatus[serviceID]; ok && !status.inSync {
req.Service = service
}
}

// Send single Check element for backwards compat with 0.4.x
if len(checks) == 1 {
req.Check = checks[0]
} else {
req.Checks = checks
}

// Perform the sync
var out struct{}
if err := l.iface.RPC("Catalog.Register", &req, &out); err != nil {
if strings.Contains(err.Error(), permissionDenied) {
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
l.logger.Printf(
"[WARN] agent: Check '%s' registration blocked by ACLs",
check.CheckID)
}
return nil
}
return err
}

// Mark the checks and services as synced
if req.Service != nil {
l.serviceStatus[serviceID] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced service '%s'", serviceID)
}
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced check '%s'", check.CheckID)
}
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
Service: service,
Check: l.checks[id],
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}

return nil
var out struct{}
err := l.iface.RPC("Catalog.Register", &req, &out)
if err == nil {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
} else if strings.Contains(err.Error(), permissionDenied) {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
return nil
}
return err
}
120 changes: 120 additions & 0 deletions command/agent/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,126 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}
}

func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
conf := nextConfig()
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()

testutil.WaitForLeader(t, agent.RPC, "dc1")

{
// Single check
srv := &structs.NodeService{
ID: "mysql",
Service: "mysql",
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv)

chk := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "mysql",
Name: "mysql",
ServiceID: "mysql",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk)

// Sync the service once
if err := agent.state.syncService("mysql"); err != nil {
t.Fatalf("err: %s", err)
}

// We should have 2 services (consul included)
svcReq := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
}
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &svcReq, &services); err != nil {
t.Fatalf("err: %v", err)
}
if len(services.NodeServices.Services) != 2 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}

// We should have one health check
chkReq := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "mysql",
}
var checks structs.IndexedHealthChecks
if err := agent.RPC("Health.ServiceChecks", &chkReq, &checks); err != nil {
t.Fatalf("err: %v", err)
}
if len(checks.HealthChecks) != 1 {
t.Fatalf("bad: %v", checks)
}
}

{
// Multiple checks
srv := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv)

chk1 := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "redis:1",
Name: "redis:1",
ServiceID: "redis",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk1)

chk2 := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "redis:2",
Name: "redis:2",
ServiceID: "redis",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk2)

// Sync the service once
if err := agent.state.syncService("redis"); err != nil {
t.Fatalf("err: %s", err)
}

// We should have 3 services (consul included)
svcReq := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
}
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &svcReq, &services); err != nil {
t.Fatalf("err: %v", err)
}
if len(services.NodeServices.Services) != 3 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}

// We should have two health checks
chkReq := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "redis",
}
var checks structs.IndexedHealthChecks
if err := agent.RPC("Health.ServiceChecks", &chkReq, &checks); err != nil {
t.Fatalf("err: %v", err)
}
if len(checks.HealthChecks) != 2 {
t.Fatalf("bad: %v", checks)
}
}
}

func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
conf := nextConfig()
conf.ACLDatacenter = "dc1"
Expand Down

0 comments on commit 1c64970

Please sign in to comment.