Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable tag drift 03 #1187

Merged
merged 13 commits into from
Sep 11, 2015
3 changes: 3 additions & 0 deletions command/agent/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ func (l *localState) setSyncState() error {
}

// If our definition is different, we need to update it
if existing.EnableTagOverride {
existing.Tags = service.Tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I was a little weirded out that the local service is changing right here, but you have the lock and I'm not sure doing it elsewhere would be any cleaner. This looks ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does seem a bit odd though. I think it might be better to have an equality checker function rather than leaning on reflect.DeepEqual here. The current logic will also become problematic when switching to an in-memory state store (which is on our short-term roadmap), since the service is a pointer and you are actually swapping out part of the data here. Having an equality check function would also make this easier to test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryanuber That makes sense, but I believe it's a little out of scope for this particular pull request. I would be happy to contribute independent of this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryanuber don't we require that the local data changes to match the server if drift is enabled so that we don't sync back to the old tags if the service definition changes for some other reason? Also, this is the localState which just has maps vs. memdb. Want to make sure I understand your concern before we pull the trigger on this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slackpad if we are doing a DeepEqual, then you are right, the definitions have to be perfectly equivalent. What I was suggesting was having like a ServiceEquals(s1, s2 *structs.NodeService, drift bool) bool kind of function, which would just skip the tag check if drift is enabled.

You're right about the local state, my bad.

One other thing though - If we are modifying the local agent state then we aren't enabling drift, we are instead flipping it so that the remote state always wins, unless I'm missing something else here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryanuber I don't think I explained myself very well re-reading what I wrote :-)

I was thinking that it's important we do the copy here because if DeepEqual detects a change after that (a real change we want to sync), we want the tags from the server going back up to the catalog, not the local ones as if we had the compare function. I think if we didn't copy then we'd revert the drifted tags once the service changes for any other reason.

}
equal := reflect.DeepEqual(existing, service)
l.serviceStatus[id] = syncStatus{inSync: equal}
}
Expand Down
96 changes: 96 additions & 0 deletions command/agent/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,102 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}
}

func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
conf := nextConfig()
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()

testutil.WaitForLeader(t, agent.RPC, "dc1")

args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
Address: "127.0.0.1",
}
var out struct{}

// EnableTagOverride = true
srv1 := &structs.NodeService{
ID: "svc_id1",
Service: "svc1",
Tags: []string{"tag1"},
Port: 6100,
EnableTagOverride: true,
}
agent.state.AddService(srv1, "")
srv1_mod := new(structs.NodeService)
*srv1_mod = *srv1
srv1_mod.Port = 7100
srv1_mod.Tags = []string{"tag1_mod"}
args.Service = srv1_mod
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}

// EnableTagOverride = false
srv2 := &structs.NodeService{
ID: "svc_id2",
Service: "svc2",
Tags: []string{"tag2"},
Port: 6200,
EnableTagOverride: false,
}
agent.state.AddService(srv2, "")
srv2_mod := new(structs.NodeService)
*srv2_mod = *srv2
srv2_mod.Port = 7200
srv2_mod.Tags = []string{"tag2_mod"}
args.Service = srv2_mod
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}

// Trigger anti-entropy run and wait
agent.StartSync()
time.Sleep(200 * time.Millisecond)

// Verify that we are in sync
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
}
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
t.Fatalf("err: %v", err)
}

// All the services should match
for id, serv := range services.NodeServices.Services {
switch id {
case "svc_id1":
if serv.ID != "svc_id1" ||
serv.Service != "svc1" ||
serv.Port != 6100 ||
!reflect.DeepEqual(serv.Tags, []string{"tag1_mod"}) {
t.Fatalf("bad: %v %v", serv, srv1)
}
case "svc_id2":
if serv.ID != "svc_id2" ||
serv.Service != "svc2" ||
serv.Port != 6200 ||
!reflect.DeepEqual(serv.Tags, []string{"tag2"}) {
t.Fatalf("bad: %v %v", serv, srv2)
}
case "consul":
// ignore
default:
t.Fatalf("unexpected service: %v", id)
}
}

for name, status := range agent.state.serviceStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}

func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
conf := nextConfig()
dir, agent := makeAgent(t, conf)
Expand Down
28 changes: 15 additions & 13 deletions command/agent/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@ import (

// ServiceDefinition is used to JSON decode the Service definitions
type ServiceDefinition struct {
ID string
Name string
Tags []string
Address string
Port int
Check CheckType
Checks CheckTypes
Token string
ID string
Name string
Tags []string
Address string
Port int
Check CheckType
Checks CheckTypes
Token string
EnableTagOverride bool
}

func (s *ServiceDefinition) NodeService() *structs.NodeService {
ns := &structs.NodeService{
ID: s.ID,
Service: s.Name,
Tags: s.Tags,
Address: s.Address,
Port: s.Port,
ID: s.ID,
Service: s.Name,
Tags: s.Tags,
Address: s.Address,
Port: s.Port,
EnableTagOverride: s.EnableTagOverride,
}
if ns.ID == "" && ns.Service != "" {
ns.ID = ns.Service
Expand Down
2 changes: 1 addition & 1 deletion command/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestExecCommandRun(t *testing.T) {

code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
t.Fatalf("bad: %d. Error:%#v (std)Output:%#v", code, ui.ErrorWriter.String(), ui.OutputWriter.String())
}

if !strings.Contains(ui.OutputWriter.String(), "load") {
Expand Down
12 changes: 6 additions & 6 deletions consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestCatalogListServices(t *testing.T) {

// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})

if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -544,7 +544,7 @@ func TestCatalogListServices_Blocking(t *testing.T) {
go func() {
time.Sleep(100 * time.Millisecond)
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
}()

// Re-run the query
Expand Down Expand Up @@ -625,7 +625,7 @@ func TestCatalogListServices_Stale(t *testing.T) {

// Inject a fake service
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})

// Run the query, do not wait for leader!
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
Expand Down Expand Up @@ -666,7 +666,7 @@ func TestCatalogListServiceNodes(t *testing.T) {

// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})

if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -709,8 +709,8 @@ func TestCatalogNodeServices(t *testing.T) {

// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80, false})

if err := client.Call("Catalog.NodeServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions consul/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
// Add some state
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureNode(2, structs.Node{"baz", "127.0.0.2"})
fsm.state.EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80})
fsm.state.EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
fsm.state.EnsureService(5, "baz", &structs.NodeService{"web", "web", nil, "127.0.0.2", 80})
fsm.state.EnsureService(6, "baz", &structs.NodeService{"db", "db", []string{"secondary"}, "127.0.0.2", 5000})
fsm.state.EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80, false})
fsm.state.EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
fsm.state.EnsureService(5, "baz", &structs.NodeService{"web", "web", nil, "127.0.0.2", 80, false})
fsm.state.EnsureService(6, "baz", &structs.NodeService{"db", "db", []string{"secondary"}, "127.0.0.2", 5000, false})
fsm.state.EnsureCheck(7, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Expand Down
Loading