Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anti-entropy sync services/checks missing entirely from Consul #850

Merged
merged 4 commits into from
Apr 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions command/agent/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,24 +310,47 @@ func (l *localState) setSyncState() error {
if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil {
return err
}
services := out1.NodeServices
checks := out2.HealthChecks

l.Lock()
defer l.Unlock()

if services != nil {
for id, service := range services.Services {
// If we don't have the service locally, deregister it
existing, ok := l.services[id]
if !ok {
l.serviceStatus[id] = syncStatus{remoteDelete: true}
continue
}
services := make(map[string]*structs.NodeService)
if out1.NodeServices != nil {
services = out1.NodeServices.Services
}

// If our definition is different, we need to update it
equal := reflect.DeepEqual(existing, service)
l.serviceStatus[id] = syncStatus{inSync: equal}
for id, _ := range l.services {
// If the local service doesn't exist remotely, then sync it
if _, ok := services[id]; !ok {
l.serviceStatus[id] = syncStatus{inSync: false}
}
}

for id, service := range services {
// If we don't have the service locally, deregister it
existing, ok := l.services[id]
if !ok {
l.serviceStatus[id] = syncStatus{remoteDelete: true}
continue
}

// If our definition is different, we need to update it
equal := reflect.DeepEqual(existing, service)
l.serviceStatus[id] = syncStatus{inSync: equal}
}

for id, _ := range l.checks {
// Sync any check which doesn't exist on the remote side
found := false
for _, check := range checks {
if check.CheckID == id {
found = true
break
}
}
if !found {
l.checkStatus[id] = syncStatus{inSync: false}
}
}

Expand Down
44 changes: 36 additions & 8 deletions command/agent/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}
agent.state.AddService(srv5)

// Exists local, in sync, remote missing (create)
srv6 := &structs.NodeService{
ID: "cache",
Service: "cache",
Tags: []string{},
Port: 11211,
}
agent.state.AddService(srv6)
agent.state.serviceStatus["cache"] = syncStatus{inSync: true}

srv5_mod := new(structs.NodeService)
*srv5_mod = *srv5
srv5_mod.Address = "127.0.0.1"
Expand All @@ -110,8 +120,8 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
t.Fatalf("err: %v", err)
}

// We should have 5 services (consul included)
if len(services.NodeServices.Services) != 5 {
// We should have 6 services (consul included)
if len(services.NodeServices.Services) != 6 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}

Expand All @@ -134,6 +144,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
if !reflect.DeepEqual(serv, srv5) {
t.Fatalf("bad: %v %v", serv, srv5)
}
case "cache":
if !reflect.DeepEqual(serv, srv6) {
t.Fatalf("bad: %v %v", serv, srv6)
}
case "consul":
// ignore
default:
Expand All @@ -142,10 +156,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}

// Check the local state
if len(agent.state.services) != 5 {
if len(agent.state.services) != 6 {
t.Fatalf("bad: %v", agent.state.services)
}
if len(agent.state.serviceStatus) != 5 {
if len(agent.state.serviceStatus) != 6 {
t.Fatalf("bad: %v", agent.state.serviceStatus)
}
for name, status := range agent.state.serviceStatus {
Expand Down Expand Up @@ -439,6 +453,16 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Exists local, in sync, remote missing (create)
chk5 := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "cache",
Name: "cache",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk5)
agent.state.checkStatus["cache"] = syncStatus{inSync: true}

// Trigger anti-entropy run and wait
agent.StartSync()
time.Sleep(200 * time.Millisecond)
Expand All @@ -453,8 +477,8 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Fatalf("err: %v", err)
}

// We should have 4 services (serf included)
if len(checks.HealthChecks) != 4 {
// We should have 5 checks (serf included)
if len(checks.HealthChecks) != 5 {
t.Fatalf("bad: %v", checks)
}

Expand All @@ -473,6 +497,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
if !reflect.DeepEqual(chk, chk3) {
t.Fatalf("bad: %v %v", chk, chk3)
}
case "cache":
if !reflect.DeepEqual(chk, chk5) {
t.Fatalf("bad: %v %v", chk, chk5)
}
case "serfHealth":
// ignore
default:
Expand All @@ -481,10 +509,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
}

// Check the local state
if len(agent.state.checks) != 3 {
if len(agent.state.checks) != 4 {
t.Fatalf("bad: %v", agent.state.checks)
}
if len(agent.state.checkStatus) != 3 {
if len(agent.state.checkStatus) != 4 {
t.Fatalf("bad: %v", agent.state.checkStatus)
}
for name, status := range agent.state.checkStatus {
Expand Down