Skip to content

Commit

Permalink
Refactor processFilteredNodes for testing (#968)
Browse files Browse the repository at this point in the history
* Refactor processFilteredNodes for testing

I was digging into the intermittent failures of Test_processFilteredNodes and had trouble really understanding what the function, and the complimenting filterNodes were doing so I broke some of the logic out into smaller functions to encapsulate the logic and make testing of each portion easier. I added unit tests for these smaller functions to help catch potential issues with each. I left Test_processFilteredNodes as it serves well as a higher level test of all the other funtions.

This adjusts the thresholds for the even distribution of picking nodes when the cardinality is lower than the possible nodes. Due to the randomness, it is quite hard to test how evenly distributed the values are within the original thresholds. This would cause many intermittent test failures when the values were just over or under the thresholds.

This also moves the seed of math.Rand into the agent startup. Reseeding the randomness every function call seems like extra overhead for no value. I do not believe that constantly reseeding the randomness increases randomness in anyway.
  • Loading branch information
sysadmind authored May 26, 2021
1 parent d1db732 commit 858ec29
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 44 deletions.
59 changes: 21 additions & 38 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -151,6 +150,9 @@ func (a *Agent) Start() error {
log := InitLogger(a.config.LogLevel, a.config.NodeName)
a.logger = log

// Initialize rand with current time
rand.Seed(time.Now().UnixNano())

// Normalize configured addresses
a.config.normalizeAddrs()

Expand Down Expand Up @@ -704,14 +706,14 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st
tags["region"] = a.config.Region

// Make a set of all members
execNodes := make(map[string]serf.Member)
allNodes := make(map[string]serf.Member)
for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
execNodes[member.Name] = member
allNodes[member.Name] = member
}
}

execNodes, tags, cardinality, err := filterNodes(execNodes, tags)
execNodes, tags, cardinality, err := filterNodes(allNodes, tags)
if err != nil {
return nil, nil, err
}
Expand All @@ -723,7 +725,6 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st
}

nodes := make(map[string]string)
rand.Seed(time.Now().UnixNano())
for ; cardinality > 0; cardinality-- {
// Pick a node, any node
randomIndex := rand.Intn(len(names))
Expand All @@ -744,52 +745,34 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st
return nodes, tags, nil
}

// filterNodes determines which of the execNodes have the given tags
// Out param! The incoming execNodes map is modified.
// filterNodes determines which of the provided nodes have the given tags
// Returns:
// * the (modified) map of execNodes
// * a map of tag values without cardinality
// * the map of allNodes that match the provided tags
// * a clean map of tag values without cardinality
// * cardinality, i.e. the max number of nodes that should be targeted, regardless of the
// number of nodes in the resulting map.
// * an error if a cardinality was malformed
func filterNodes(execNodes map[string]serf.Member, tags map[string]string) (map[string]serf.Member, map[string]string, int, error) {
cardinality := int(^uint(0) >> 1) // MaxInt
func filterNodes(allNodes map[string]serf.Member, tags map[string]string) (map[string]serf.Member, map[string]string, int, error) {
ct, cardinality, err := cleanTags(tags)
if err != nil {
return nil, nil, 0, err
}

cleanTags := make(map[string]string)
matchingNodes := make(map[string]serf.Member)

// Filter nodes that lack tags
// Determine lowest cardinality along the way
for jtk, jtv := range tags {
tc := strings.Split(jtv, ":")
tv := tc[0]

// Set original tag to clean tag
cleanTags[jtk] = tv

// Remove nodes that do not have the selected tags
for name, member := range execNodes {
if mtv, tagPresent := member.Tags[jtk]; !tagPresent || mtv != tv {
delete(execNodes, name)
}
}

if len(tc) == 2 {
tagCardinality, err := strconv.Atoi(tc[1])
if err != nil {
return nil, nil, 0, err
}
if tagCardinality < cardinality {
cardinality = tagCardinality
}
for name, member := range allNodes {
if nodeMatchesTags(member, ct) {
matchingNodes[name] = member
}
}

// limit the cardinality to the number of possible nodes
if len(execNodes) < cardinality {
cardinality = len(execNodes)
if len(matchingNodes) < cardinality {
cardinality = len(matchingNodes)
}

return execNodes, cleanTags, cardinality, nil
return matchingNodes, ct, cardinality, nil
}

// This function is called when a client request the RPCAddress
Expand Down
139 changes: 133 additions & 6 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"io/ioutil"
"log"
"os"
"reflect"
"testing"
"time"

"github.com/hashicorp/serf/serf"
"github.com/hashicorp/serf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -328,12 +330,12 @@ func Test_processFilteredNodes(t *testing.T) {
for name, count := range distrib {
fmt.Println(name, float64(count)/float64(sampleSize)*100.0, "%")
}
assert.Greater(t, float64(distrib["test1"])/float64(sampleSize), 0.3)
assert.Less(t, float64(distrib["test1"])/float64(sampleSize), 0.36)
assert.Greater(t, float64(distrib["test2"])/float64(sampleSize), 0.3)
assert.Less(t, float64(distrib["test2"])/float64(sampleSize), 0.36)
assert.Greater(t, float64(distrib["test3"])/float64(sampleSize), 0.3)
assert.Less(t, float64(distrib["test3"])/float64(sampleSize), 0.36)
assert.Greater(t, float64(distrib["test1"])/float64(sampleSize), 0.25)
assert.Less(t, float64(distrib["test1"])/float64(sampleSize), 0.40)
assert.Greater(t, float64(distrib["test2"])/float64(sampleSize), 0.25)
assert.Less(t, float64(distrib["test2"])/float64(sampleSize), 0.40)
assert.Greater(t, float64(distrib["test3"])/float64(sampleSize), 0.25)
assert.Less(t, float64(distrib["test3"])/float64(sampleSize), 0.40)

// Clean up
a1.Stop()
Expand Down Expand Up @@ -460,3 +462,128 @@ func TestAgentConfig(t *testing.T) {

a.Stop()
}

func Test_filterNodes(t *testing.T) {
nodes := map[string]serf.Member{
"node1": {
Tags: map[string]string{
"region": "global",
"tag": "test",
"just1": "value",
"tagfor2": "2",
},
},
"node2": {
Tags: map[string]string{
"region": "global",
"tag": "test",
"just2": "value",
"tagfor2": "2",
},
},
"node3": {
Tags: map[string]string{
"region": "global",
"tag": "test",
"just3": "value",
},
},
}
type args struct {
execNodes map[string]serf.Member
tags map[string]string
}
tests := []struct {
name string
args args
want map[string]serf.Member
want1 map[string]string
want2 int
wantErr bool
}{
{
name: "All nodes tag",
args: args{
execNodes: nodes,
tags: map[string]string{"tag": "test"},
},
want: nodes,
want1: map[string]string{"tag": "test"},
want2: 3,
wantErr: false,
},
{
name: "Just node1 tag",
args: args{
execNodes: nodes,
tags: map[string]string{"just1": "value"},
},
want: map[string]serf.Member{"node1": nodes["node1"]},
want1: map[string]string{"just1": "value"},
want2: 1,
wantErr: false,
},
{
name: "Just node2 tag",
args: args{
execNodes: nodes,
tags: map[string]string{"just2": "value"},
},
want: map[string]serf.Member{"node2": nodes["node2"]},
want1: map[string]string{"just2": "value"},
want2: 1,
wantErr: false,
},
{
name: "Matching 2 nodes",
args: args{
execNodes: nodes,
tags: map[string]string{"tagfor2": "2"},
},
want: map[string]serf.Member{"node1": nodes["node1"], "node2": nodes["node2"]},
want1: map[string]string{"tagfor2": "2"},
want2: 2,
wantErr: false,
},
{
name: "No matching nodes",
args: args{
execNodes: nodes,
tags: map[string]string{"unknown": "value"},
},
want: map[string]serf.Member{},
want1: map[string]string{"unknown": "value"},
want2: 0,
wantErr: false,
},
{
name: "All nodes low cardinality",
args: args{
execNodes: nodes,
tags: map[string]string{"tag": "test:1"},
},
want: nodes,
want1: map[string]string{"tag": "test"},
want2: 1,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1, got2, err := filterNodes(tt.args.execNodes, tt.args.tags)
if (err != nil) != tt.wantErr {
t.Errorf("filterNodes() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("filterNodes() got = %v, want %v", got, tt.want)
}
if !reflect.DeepEqual(got1, tt.want1) {
t.Errorf("filterNodes() got1 = %v, want %v", got1, tt.want1)
}
if got2 != tt.want2 {
t.Errorf("filterNodes() got2 = %v, want %v", got2, tt.want2)
}
})
}
}
54 changes: 54 additions & 0 deletions dkron/tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package dkron

import (
"fmt"
"strconv"
"strings"

"github.com/hashicorp/serf/serf"
)

// cleanTags takes the tag spec and returns strictly key:value pairs
// along with the lowest cardinality specified
func cleanTags(tags map[string]string) (map[string]string, int, error) {
cardinality := int(^uint(0) >> 1) // MaxInt

cleanTags := make(map[string]string, len(tags))

for k, v := range tags {
vparts := strings.Split(v, ":")

cleanTags[k] = vparts[0]

// If a cardinality is specified (i.e. "value:3") and it is lower than our
// max cardinality, lower the max
if len(vparts) == 2 {
tagCard, err := strconv.Atoi(vparts[1])
if err != nil {
// Tag value is malformed
return nil, 0, fmt.Errorf("improper cardinality specified for tag %s: %v", k, vparts[1])
}

if tagCard < cardinality {
cardinality = tagCard
}
}
}

return cleanTags, cardinality, nil
}

// nodeMatchesTags encapsulates the logic of testing if a node matches all of the provided tags
func nodeMatchesTags(node serf.Member, tags map[string]string) bool {
for k, v := range tags {
nodeVal, present := node.Tags[k]
if !present {
return false
}
if nodeVal != v {
return false
}
}
// If we matched all key:value pairs, the node matches the tags
return true
}
66 changes: 66 additions & 0 deletions dkron/tags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package dkron

import (
"reflect"
"testing"
)

func Test_cleanTags(t *testing.T) {
maxInt := int(^uint(0) >> 1)
type args struct {
tags map[string]string
}
tests := []struct {
name string
args args
want map[string]string
want1 int
wantErr bool
}{
{
name: "Clean Tags",
args: args{map[string]string{"key1": "value1", "key2": "value2"}},
want: map[string]string{"key1": "value1", "key2": "value2"},
want1: maxInt,
wantErr: false,
},
{
name: "With Cardinality",
args: args{map[string]string{"key1": "value1", "key2": "value2:5"}},
want: map[string]string{"key1": "value1", "key2": "value2"},
want1: 5,
wantErr: false,
},
{
name: "With Multiple Cardinalities",
args: args{map[string]string{"key1": "value1:2", "key2": "value2:5"}},
want: map[string]string{"key1": "value1", "key2": "value2"},
want1: 2,
wantErr: false,
},
{
name: "With String Cardinality",
args: args{map[string]string{"key1": "value1", "key2": "value2:cardinality"}},
want: nil,
want1: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1, err := cleanTags(tt.args.tags)
if (err != nil) != tt.wantErr {
t.Errorf("cleanTags() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Logf("got map: %#v", got)
t.Logf("want map: %#v", tt.want)
t.Errorf("cleanTags() got = %v, want %v", got, tt.want)
}
if got1 != tt.want1 {
t.Errorf("cleanTags() got1 = %v, want %v", got1, tt.want1)
}
})
}
}

0 comments on commit 858ec29

Please sign in to comment.