Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix uneven balancing #865

Merged
merged 1 commit into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,17 +705,17 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st
}

// Create an array of node names to aid in computing resulting set based on cardinality
var nameIndex []string
var names []string
for name := range execNodes {
nameIndex = append(nameIndex, name)
names = append(names, name)
}

nodes := make(map[string]string)
rand.Seed(time.Now().UnixNano())
for ; cardinality > 0; cardinality-- {
// Pick a node, any node
randomIndex := rand.Intn(cardinality)
m := execNodes[nameIndex[randomIndex]]
randomIndex := rand.Intn(len(names))
m := execNodes[names[randomIndex]]

// Store name and address
if addr, ok := m.Tags["rpc_addr"]; ok {
Expand All @@ -725,8 +725,8 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st
}

// Swap picked node with the first one and shorten array, so node can't get picked again
nameIndex[randomIndex], nameIndex[0] = nameIndex[0], nameIndex[randomIndex]
nameIndex = nameIndex[1:]
names[randomIndex], names[0] = names[0], names[randomIndex]
names = names[1:]
}

return nodes, tags, nil
Expand Down
46 changes: 35 additions & 11 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dkron

import (
"fmt"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -283,24 +284,47 @@ func Test_processFilteredNodes(t *testing.T) {

// Test two tags matching same 3 servers and cardinality of 1 should always return 1 server

// Do this 10 times: an old bug caused this to sometimes succeed and sometimes fail due to the use of math.rand
// Statistically, with 10 tries about 3 should succeed and the rest should fail, if the code is buggy.
for i := 0; i < 10; i++ {
job8 := &Job{
Name: "test_job_7",
Tags: map[string]string{
"additional": "value:1",
"additional2": "value2:1",
},
}

// Do this multiple times: an old bug caused this to sometimes succeed and
// sometimes fail (=return no nodes at all) due to the use of math.rand
// Statistically, about 33% should succeed and the rest should fail if
// the code is buggy.
// Another bug caused one node to be favored over the others. With a
// large enough number of attempts, each node should be chosen about 1/3
// of the time.
job8 := &Job{
Name: "test_job_8",
Tags: map[string]string{
"additional": "value:1",
"additional2": "value2:1",
},
}
distrib := make(map[string]int)
var sampleSize = 1000
for i := 0; i < sampleSize; i++ {
nodes, tags, err = a1.processFilteredNodes(job8)
require.NoError(t, err)

assert.Len(t, nodes, 1)
assert.Equal(t, tags["additional"], "value")
assert.Equal(t, tags["additional2"], "value2")
for name := range nodes {
distrib[name] = distrib[name] + 1
}
}

// Each node must have been chosen between 30% and 36% of the time,
// for the distribution to be considered equal.
// Note: This test should almost never, but still can, fail even if the
// code is fine. To fix this, the randomizer ought to be mocked.
for name, count := range distrib {
fmt.Println(name, float64(count)/float64(sampleSize)*100.0, "%")
}
assert.Greater(t, float64(distrib["test1"])/float64(sampleSize), 0.3)
assert.Less(t, float64(distrib["test1"])/float64(sampleSize), 0.36)
assert.Greater(t, float64(distrib["test2"])/float64(sampleSize), 0.3)
assert.Less(t, float64(distrib["test2"])/float64(sampleSize), 0.36)
assert.Greater(t, float64(distrib["test3"])/float64(sampleSize), 0.3)
assert.Less(t, float64(distrib["test3"])/float64(sampleSize), 0.36)

// Clean up
a1.Stop()
Expand Down