Skip to content

Commit

Permalink
[Performance On Large clusters] Reduce updates on large services (#4720)
Browse files Browse the repository at this point in the history
* [Performance On Large clusters] Checks do update services/nodes only when really modified to avoid too many updates on very large clusters

In a large cluster, when having a few thousands of nodes, the anti-entropy
mechanism performs lots of changes (several per seconds) while
there is no real change. This patch wants to improve this in order
to increase Consul scalability when using many blocking requests on
health for instance.

* [Performance for large clusters] Only updates index of service if service is really modified

* [Performance for large clusters] Only updates index of nodes if node is really modified

* Added comments / ensure IsSame() has clear semantics

* Avoid having modified boolean, return nil directly if stutures are Same

* Fixed unstable unit tests TestLeader_ChangeServerID

* Rewrite TestNode_IsSame() for better readability as suggested by @banks

* Rename ServiceNode.IsSame() into IsSameService() + added unit tests

* Do not duplicate TestStructs_ServiceNode_Conversions() and increase test coverage of IsSameService

* Clearer documentation in IsSameService

* Take into account ServiceProxy into ServiceNode.IsSameService()

* Fixed IsSameService() with all new structures
  • Loading branch information
pierresouchay authored and banks committed Oct 11, 2018
1 parent f0c06a9 commit 51b33ef
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 47 deletions.
2 changes: 2 additions & 0 deletions agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,8 @@ func TestLeader_ChangeServerID(t *testing.T) {
defer os.RemoveAll(dir4)
defer s4.Shutdown()
joinLAN(t, s4, s1)
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s4.RPC, "dc1")
servers[2] = s4

// While integrating #3327 it uncovered that this test was flaky. The
Expand Down
63 changes: 47 additions & 16 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) err
// Get the indexes.
if n != nil {
node.CreateIndex = n.CreateIndex
node.ModifyIndex = n.ModifyIndex
// We do not need to update anything
if node.IsSame(n) {
return nil
}
node.ModifyIndex = idx
} else {
node.CreateIndex = idx
Expand Down Expand Up @@ -687,14 +692,6 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
// conversion doesn't populate any of the node-specific information.
// That's always populated when we read from the state store.
entry := svc.ToServiceNode(node)
if existing != nil {
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
entry.ModifyIndex = idx
} else {
entry.CreateIndex = idx
entry.ModifyIndex = idx
}

// Get the node
n, err := tx.First("nodes", "id", node)
if err != nil {
Expand All @@ -703,6 +700,21 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if n == nil {
return ErrMissingNode
}
if existing != nil {
serviceNode := existing.(*structs.ServiceNode)
entry.CreateIndex = serviceNode.CreateIndex
entry.ModifyIndex = serviceNode.ModifyIndex
// We cannot return here because: we want to keep existing behaviour (ex: failed node lookup -> ErrMissingNode)
// It might be modified in future, but it requires changing many unit tests
// Enforcing saving the entry also ensures that if we add default values in .ToServiceNode()
// those values will be saved even if node is not really modified for a while.
if entry.IsSameService(serviceNode) {
return nil
}
} else {
entry.CreateIndex = idx
}
entry.ModifyIndex = idx

// Insert the service and update the index
if err := tx.Insert("services", entry); err != nil {
Expand Down Expand Up @@ -1236,8 +1248,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec

// Set the indexes
if existing != nil {
hc.CreateIndex = existing.(*structs.HealthCheck).CreateIndex
hc.ModifyIndex = idx
existingCheck := existing.(*structs.HealthCheck)
hc.CreateIndex = existingCheck.CreateIndex
hc.ModifyIndex = existingCheck.ModifyIndex
} else {
hc.CreateIndex = idx
hc.ModifyIndex = idx
Expand All @@ -1257,6 +1270,7 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
return ErrMissingNode
}

modified := true
// If the check is associated with a service, check that we have
// a registration for the service.
if hc.ServiceID != "" {
Expand All @@ -1272,14 +1286,24 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) {
modified = false
} else {
// Check has been modified, we trigger a index service change
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
} else {
// Update the status for all the services associated with this node
err = s.updateAllServiceIndexesOfNode(tx, idx, hc.Node)
if err != nil {
return err
if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) {
modified = false
} else {
// Since the check has been modified, it impacts all services of node
// Update the status for all the services associated with this node
err = s.updateAllServiceIndexesOfNode(tx, idx, hc.Node)
if err != nil {
return err
}
}
}

Expand All @@ -1303,6 +1327,13 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
}
}
}
if modified {
// We update the modify index, ONLY if something has changed, thus
// With constant output, no change is seen when watching a service
// With huge number of nodes where anti-entropy updates continuously
// the checks, but not the values within the check
hc.ModifyIndex = idx
}

// Persist the check registration in the db.
if err := tx.Insert("checks", hc); err != nil {
Expand Down
117 changes: 87 additions & 30 deletions agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
CheckID: "check1",
Name: "check",
Status: "critical",
RaftIndex: structs.RaftIndex{CreateIndex: 3, ModifyIndex: 4},
RaftIndex: structs.RaftIndex{CreateIndex: 3, ModifyIndex: 3},
},
&structs.HealthCheck{
Node: "node1",
Expand Down Expand Up @@ -491,8 +491,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
}
c1 := out[0]
if c1.Node != nodeName || c1.CheckID != "check1" || c1.Name != "check" ||
c1.CreateIndex != 3 || c1.ModifyIndex != 4 {
t.Fatalf("bad check returned: %#v", c1)
c1.CreateIndex != 3 || c1.ModifyIndex != 3 {
t.Fatalf("bad check returned, should not be modified: %#v", c1)
}

c2 := out[1]
Expand All @@ -508,6 +508,9 @@ func deprecatedEnsureNodeWithoutIDCanRegister(t *testing.T, s *Store, nodeName s
in := &structs.Node{
Node: nodeName,
Address: "1.1.1.9",
Meta: map[string]string{
"version": string(txIdx),
},
}
if err := s.EnsureNode(txIdx, in); err != nil {
t.Fatalf("err: %s", err)
Expand All @@ -517,10 +520,10 @@ func deprecatedEnsureNodeWithoutIDCanRegister(t *testing.T, s *Store, nodeName s
t.Fatalf("err: %s", err)
}
if idx != txIdx {
t.Fatalf("index should be %q, was: %q", txIdx, idx)
t.Fatalf("index should be %v, was: %v", txIdx, idx)
}
if out.Node != nodeName {
t.Fatalf("unexpected result out = %q, nodeName supposed to be %s", out, nodeName)
t.Fatalf("unexpected result out = %v, nodeName supposed to be %s", out, nodeName)
}
}

Expand Down Expand Up @@ -726,8 +729,12 @@ func TestStateStore_EnsureNode(t *testing.T) {
}

// Update the node registration
in.Address = "1.1.1.2"
if err := s.EnsureNode(2, in); err != nil {
in2 := &structs.Node{
ID: in.ID,
Node: in.Node,
Address: "1.1.1.2",
}
if err := s.EnsureNode(2, in2); err != nil {
t.Fatalf("err: %s", err)
}

Expand All @@ -745,15 +752,32 @@ func TestStateStore_EnsureNode(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}

// Re-inserting data should not modify ModifiedIndex
if err := s.EnsureNode(3, in2); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if out.CreateIndex != 1 || out.ModifyIndex != 2 || out.Address != "1.1.1.2" {
t.Fatalf("node was modified: %#v", out)
}

// Node upsert preserves the create index
if err := s.EnsureNode(3, in); err != nil {
in3 := &structs.Node{
ID: in.ID,
Node: in.Node,
Address: "1.1.1.3",
}
if err := s.EnsureNode(3, in3); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if out.CreateIndex != 1 || out.ModifyIndex != 3 || out.Address != "1.1.1.2" {
if out.CreateIndex != 1 || out.ModifyIndex != 3 || out.Address != "1.1.1.3" {
t.Fatalf("node was modified: %#v", out)
}
if idx != 3 {
Expand Down Expand Up @@ -2177,32 +2201,60 @@ func TestStateStore_EnsureCheck(t *testing.T) {
t.Fatalf("bad: %#v", checks[0])
}

// Modify the health check
check.Output = "bbb"
if err := s.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %s", err)
}
testCheckOutput := func(expectedNodeIndex, expectedIndexForCheck uint64, outputTxt string) {
// Check that we successfully updated
idx, checks, err = s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != expectedNodeIndex {
t.Fatalf("bad index: %d", idx)
}

// Check that we successfully updated
idx, checks, err = s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
if len(checks) != 1 {
t.Fatalf("wrong number of checks: %d", len(checks))
}
if checks[0].Output != outputTxt {
t.Fatalf("wrong check output: %#v", checks[0])
}
if checks[0].CreateIndex != 3 || checks[0].ModifyIndex != expectedIndexForCheck {
t.Fatalf("bad index: %#v, expectedIndexForCheck:=%v ", checks[0], expectedIndexForCheck)
}
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
// Do not really modify the health check content the health check
check = &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "redis check",
Status: api.HealthPassing,
Notes: "test check",
Output: "aaa",
ServiceID: "service1",
ServiceName: "redis",
}
if len(checks) != 1 {
t.Fatalf("wrong number of checks: %d", len(checks))
if err := s.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %s", err)
}
if checks[0].Output != "bbb" {
t.Fatalf("wrong check output: %#v", checks[0])
testCheckOutput(4, 3, check.Output)

// Do modify the heathcheck
check = &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "redis check",
Status: api.HealthPassing,
Notes: "test check",
Output: "bbbmodified",
ServiceID: "service1",
ServiceName: "redis",
}
if checks[0].CreateIndex != 3 || checks[0].ModifyIndex != 4 {
t.Fatalf("bad index: %#v", checks[0])
if err := s.EnsureCheck(5, check); err != nil {
t.Fatalf("err: %s", err)
}
testCheckOutput(5, 5, "bbbmodified")

// Index tables were updated
if idx := s.maxIndex("checks"); idx != 4 {
if idx := s.maxIndex("checks"); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
}
Expand Down Expand Up @@ -2890,7 +2942,7 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
}

// Node updates alter the returned index and fire the watch.
testRegisterNode(t, s, 8, "node1")
testRegisterNodeWithChange(t, s, 8, "node1")
if !watchFired(ws) {
t.Fatalf("bad")
}
Expand All @@ -2905,7 +2957,8 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
}

// Service updates alter the returned index and fire the watch.
testRegisterService(t, s, 9, "node1", "service1")

testRegisterServiceWithChange(t, s, 9, "node1", "service1", true)
if !watchFired(ws) {
t.Fatalf("bad")
}
Expand Down Expand Up @@ -3261,6 +3314,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
ID: "service1",
Service: "service1",
Address: "1.1.1.1",
Meta: make(map[string]string),
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
Expand All @@ -3272,6 +3326,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
ID: "service2",
Service: "service2",
Address: "1.1.1.1",
Meta: make(map[string]string),
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
Expand Down Expand Up @@ -3313,6 +3368,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service1",
Address: "1.1.1.1",
Port: 1111,
Meta: make(map[string]string),
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
Expand All @@ -3324,6 +3380,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service2",
Address: "1.1.1.1",
Port: 1111,
Meta: make(map[string]string),
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
Expand All @@ -3344,7 +3401,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
if len(dump) != 1 || !reflect.DeepEqual(dump[0], expect[0]) {
t.Fatalf("bad: %#v", dump)
t.Fatalf("bad: len=%#v dump=%#v expect=%#v", len(dump), dump[0], expect[0])
}

// Generate a dump of all the nodes
Expand Down
Loading

0 comments on commit 51b33ef

Please sign in to comment.