Skip to content

Commit

Permalink
Fix flaky TestHare_ReconstructForward (#6299)
Browse files Browse the repository at this point in the history
## Motivation

Fix flaky test.

Closes #6293.
  • Loading branch information
fasmat committed Aug 29, 2024
1 parent ad9e419 commit a4f3e85
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 79 deletions.
19 changes: 10 additions & 9 deletions fetch/wire_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,16 @@ type MaliciousIDs struct {
}

type EpochData struct {
// When changing this value also check
// - the size of `ResponseMessage.Data` above
// - the size of `Response.Data` in `p2p/server/server.go`
// - the size of `NodeIDs` in `MaliciousIDs` above
// - the size of `Set` in `EpochActiveSet` in common/types/activation.go
// - the size of `EligibilityProofs` in the type `Ballot` in common/types/ballot.go
// - the size of `Rewards` in the type `InnerBlock` in common/types/block.go
// - the size of `Ballots` in the type `LayerData` below
// - the size of `Proposals` in the type `Value` in hare3/types.go
// When changing this value also check the size of
// - `ResponseMessage.Data` above
// - `Response.Data` in `p2p/server/server.go`
// - `NodeIDs` in `MaliciousIDs` above
// - `Set` in `EpochActiveSet` in common/types/activation.go
// - `EligibilityProofs` in the type `Ballot` in common/types/ballot.go
// - `Rewards` in the type `InnerBlock` in common/types/block.go
// - `Ballots` in the type `LayerData` below
// - `Proposals` in the type `Value` in hare3/types.go
// - `Proposals` and `CompactProposals` in the type `Value` in hare4/types.go
AtxIDs []types.ATXID `scale:"max=8000000"`
}

Expand Down
23 changes: 14 additions & 9 deletions hare3/hare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"os"
"runtime/pprof"
"strconv"
"strings"
"sync"
"testing"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/hare3/eligibility"
"github.com/spacemeshos/go-spacemesh/layerpatrol"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
pmocks "github.com/spacemeshos/go-spacemesh/p2p/pubsub/mocks"
"github.com/spacemeshos/go-spacemesh/proposals/store"
Expand Down Expand Up @@ -211,7 +213,6 @@ func (n *node) withPublisher() *node {

func (n *node) withHare() *node {
logger := zaptest.NewLogger(n.t).Named(fmt.Sprintf("hare=%d", n.i))

n.nclock = &testNodeClock{
genesis: n.t.start,
layerDuration: n.t.layerDuration,
Expand Down Expand Up @@ -259,6 +260,10 @@ func (n *node) storeAtx(atx *types.ActivationTx) error {
return nil
}

func (n *node) peerId() p2p.Peer {
return p2p.Peer(strconv.Itoa(n.i))
}

type clusterOpt func(*lockstepCluster)

func withUnits(min, max int) clusterOpt {
Expand All @@ -276,14 +281,15 @@ func withProposals(fraction float64) clusterOpt {
}

// withSigners creates N signers in addition to regular active nodes.
// this signeres will be partitioned in fair fashion across regular active nodes.
// this signers will be partitioned in fair fashion across regular active nodes.
func withSigners(n int) clusterOpt {
return func(cluster *lockstepCluster) {
cluster.signersCount = n
}
}

func newLockstepCluster(t *tester, opts ...clusterOpt) *lockstepCluster {
t.Helper()
cluster := &lockstepCluster{t: t}
cluster.units.min = 10
cluster.units.max = 10
Expand Down Expand Up @@ -316,9 +322,7 @@ type lockstepCluster struct {

func (cl *lockstepCluster) addNode(n *node) {
n.hare.Start()
cl.t.Cleanup(func() {
n.hare.Stop()
})
cl.t.Cleanup(n.hare.Stop)
cl.nodes = append(cl.nodes, n)
}

Expand Down Expand Up @@ -449,7 +453,7 @@ func (cl *lockstepCluster) setup() {
Publish(gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, _ string, msg []byte) error {
for _, other := range cl.nodes {
other.hare.Handler(ctx, "self", msg)
other.hare.Handler(ctx, n.peerId(), msg)
}
return nil
}).
Expand Down Expand Up @@ -512,7 +516,7 @@ func waitForChan[T any](t testing.TB, ch <-chan T, timeout time.Duration, failur
var value T
select {
case <-time.After(timeout):
builder := strings.Builder{}
var builder strings.Builder
pprof.Lookup("goroutine").WriteTo(&builder, 2)
t.Fatalf(failureMsg+", waited: %v, stacktraces:\n%s", timeout, builder.String())
case value = <-ch:
Expand All @@ -523,7 +527,7 @@ func waitForChan[T any](t testing.TB, ch <-chan T, timeout time.Duration, failur
func sendWithTimeout[T any](t testing.TB, value T, ch chan<- T, timeout time.Duration, failureMsg string) {
select {
case <-time.After(timeout):
builder := strings.Builder{}
var builder strings.Builder
pprof.Lookup("goroutine").WriteTo(&builder, 2)
t.Fatalf(failureMsg+", waited: %v, stacktraces:\n%s", timeout, builder.String())
case ch <- value:
Expand Down Expand Up @@ -562,6 +566,7 @@ func (t *testTracer) OnMessageSent(m *Message) {
func (*testTracer) OnMessageReceived(*Message) {}

func testHare(t *testing.T, active, inactive, equivocators int, opts ...clusterOpt) {
t.Helper()
cfg := DefaultConfig()
cfg.LogStats = true
tst := &tester{
Expand Down Expand Up @@ -912,7 +917,7 @@ func TestProposals(t *testing.T) {
atxsdata.AddFromAtx(&atx, false)
}
for _, proposal := range tc.proposals {
proposals.Add(proposal)
require.NoError(t, proposals.Add(proposal))
}
for _, id := range tc.malicious {
require.NoError(t, identities.SetMalicious(db, id, []byte("non empty"), time.Time{}))
Expand Down
98 changes: 37 additions & 61 deletions hare4/hare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"

Expand All @@ -44,8 +43,6 @@ import (

const layersPerEpoch = 4

var wait = 10 * time.Second

func TestMain(m *testing.M) {
types.SetLayersPerEpoch(layersPerEpoch)
res := m.Run()
Expand Down Expand Up @@ -245,7 +242,6 @@ func (n *node) withHare() *node {
} else {
verify = signing.NewEdVerifier()
}
z, _ := zap.NewDevelopment()
n.hare = New(
n.nclock,
n.mpublisher,
Expand All @@ -262,7 +258,6 @@ func (n *node) withHare() *node {
WithWallClock(n.clock),
WithTracer(tracer),
WithServer(n.mockStreamRequester),
WithLogger(z),
)
n.register(n.signer)
return n
Expand Down Expand Up @@ -322,7 +317,7 @@ func withProposals(fraction float64) clusterOpt {
}

// withSigners creates N signers in addition to regular active nodes.
// this signeres will be partitioned in fair fashion across regular active nodes.
// this signers will be partitioned in fair fashion across regular active nodes.
func withSigners(n int) clusterOpt {
return func(cluster *lockstepCluster) {
cluster.signersCount = n
Expand Down Expand Up @@ -351,7 +346,8 @@ type lockstepCluster struct {

mockVerify bool
collidingProposals bool
units struct {

units struct {
min, max int
}
proposals struct {
Expand All @@ -365,9 +361,7 @@ type lockstepCluster struct {

func (cl *lockstepCluster) addNode(n *node) {
n.hare.Start()
cl.t.Cleanup(func() {
n.hare.Stop()
})
cl.t.Cleanup(n.hare.Stop)
cl.nodes = append(cl.nodes, n)
}

Expand Down Expand Up @@ -451,9 +445,7 @@ func (cl *lockstepCluster) activeSet() types.ATXIDList {
func (cl *lockstepCluster) genProposalNode(lid types.LayerID, node int) {
active := cl.activeSet()
n := cl.nodes[node]
if n.atx == nil {
panic("shouldnt happen")
}
require.NotNil(cl.t.TB, n.atx)
proposal := &types.Proposal{}
proposal.Layer = lid
proposal.EpochData = &types.EpochData{
Expand Down Expand Up @@ -637,7 +629,7 @@ func waitForChan[T any](t testing.TB, ch <-chan T, timeout time.Duration, failur
var value T
select {
case <-time.After(timeout):
builder := strings.Builder{}
var builder strings.Builder
pprof.Lookup("goroutine").WriteTo(&builder, 2)
t.Fatalf(failureMsg+", waited: %v, stacktraces:\n%s", timeout, builder.String())
case value = <-ch:
Expand All @@ -648,23 +640,23 @@ func waitForChan[T any](t testing.TB, ch <-chan T, timeout time.Duration, failur
func sendWithTimeout[T any](t testing.TB, value T, ch chan<- T, timeout time.Duration, failureMsg string) {
select {
case <-time.After(timeout):
builder := strings.Builder{}
var builder strings.Builder
pprof.Lookup("goroutine").WriteTo(&builder, 2)
t.Fatalf(failureMsg+", waited: %v, stacktraces:\n%s", timeout, builder.String())
case ch <- value:
}
}

func (t *testTracer) waitStopped() types.LayerID {
return waitForChan(t.TB, t.stopped, wait, "didn't stop")
return waitForChan(t.TB, t.stopped, 10*time.Second, "didn't stop")
}

func (t *testTracer) waitEligibility() []*types.HareEligibility {
return waitForChan(t.TB, t.eligibility, wait, "no eligibility")
return waitForChan(t.TB, t.eligibility, 10*time.Second, "no eligibility")
}

func (t *testTracer) waitSent() *Message {
return waitForChan(t.TB, t.sent, wait, "no message")
return waitForChan(t.TB, t.sent, 10*time.Second, "no message")
}

func (*testTracer) OnStart(types.LayerID) {}
Expand All @@ -677,21 +669,21 @@ func (t *testTracer) OnStop(lid types.LayerID) {
}

func (t *testTracer) OnActive(el []*types.HareEligibility) {
sendWithTimeout(t.TB, el, t.eligibility, wait, "eligibility can't be sent")
sendWithTimeout(t.TB, el, t.eligibility, 10*time.Second, "eligibility can't be sent")
}

func (t *testTracer) OnMessageSent(m *Message) {
sendWithTimeout(t.TB, m, t.sent, wait, "message can't be sent")
sendWithTimeout(t.TB, m, t.sent, 10*time.Second, "message can't be sent")
}

func (*testTracer) OnMessageReceived(*Message) {}

func (t *testTracer) OnCompactIdRequest(*CompactIdRequest) {
sendWithTimeout(t.TB, struct{}{}, t.compactReq, wait, "compact req can't be sent")
sendWithTimeout(t.TB, struct{}{}, t.compactReq, 10*time.Second, "compact req can't be sent")
}

func (t *testTracer) OnCompactIdResponse(*CompactIdResponse) {
sendWithTimeout(t.TB, struct{}{}, t.compactResp, wait, "compact resp can't be sent")
sendWithTimeout(t.TB, struct{}{}, t.compactResp, 10*time.Second, "compact resp can't be sent")
}

func testHare(t *testing.T, active, inactive, equivocators int, opts ...clusterOpt) {
Expand Down Expand Up @@ -886,13 +878,9 @@ func TestHandler(t *testing.T) {
p1.Initialize()
p2.Initialize()

if err := n.hare.OnProposal(p1); err != nil {
panic(err)
}
require.NoError(t, n.hare.OnProposal(p1))
require.NoError(t, n.hare.OnProposal(p2))

if err := n.hare.OnProposal(p2); err != nil {
panic(err)
}
msg1 := &Message{}
msg1.Layer = layer
msg1.Value.Proposals = []types.ProposalID{p1.ID()}
Expand Down Expand Up @@ -1088,9 +1076,7 @@ func TestProposals(t *testing.T) {
atxsdata.AddFromAtx(&atx, false)
}
for _, proposal := range tc.proposals {
if err := proposals.Add(proposal); err != nil {
panic(err)
}
require.NoError(t, proposals.Add(proposal))
}
for _, id := range tc.malicious {
require.NoError(t, identities.SetMalicious(db, id, []byte("non empty"), time.Time{}))
Expand Down Expand Up @@ -1165,8 +1151,9 @@ func TestHare_ReconstructForward(t *testing.T) {
beacon: types.Beacon{1, 1, 1, 1},
genesis: types.GetEffectiveGenesis(),
}
const numNodes = 3
cluster := newLockstepCluster(tst).
addActive(3)
addActive(numNodes)
if cluster.signersCount > 0 {
cluster = cluster.addSigner(cluster.signersCount)
cluster.partitionSigners()
Expand All @@ -1176,7 +1163,7 @@ func TestHare_ReconstructForward(t *testing.T) {

// cluster setup
active := cluster.activeSet()
for i, n := range cluster.nodes {
for _, n := range cluster.nodes {
require.NoError(cluster.t, beacons.Add(n.db, cluster.t.genesis.GetEpoch()+1, cluster.t.beacon))
for _, other := range append(cluster.nodes, cluster.signers...) {
if other.atx == nil {
Expand All @@ -1197,41 +1184,30 @@ func TestHare_ReconstructForward(t *testing.T) {
m := &Message{}
codec.MustDecode(msg, m)
if m.Body.IterRound.Round == preround {
other := [2]int{0, 0}
switch i {
case 0:
other[0] = 1
other[1] = 2
case 1:
other[0] = 0
other[1] = 2
case 2:
other[0] = 1
other[1] = 0
default:
panic("bad")
}
if err := cluster.nodes[other[0]].hare.
Handler(ctx, cluster.nodes[i].peerId(), msg); err != nil {
panic(err)
}
if err := cluster.nodes[other[1]].hare.
Handler(ctx, cluster.nodes[other[0]].peerId(), msg); err != nil {
panic(err)
for j := 1; j < numNodes; j++ {
other := (n.i + j) % numNodes // other iterates over the other nodes
sender := (other - 1 + numNodes) % numNodes // sender is the previous node
require.Eventually(t, func() bool {
cluster.nodes[other].hare.mu.Lock()
defer cluster.nodes[other].hare.mu.Unlock()
_, registered := cluster.nodes[other].hare.sessions[m.Layer]
return registered
}, 5*time.Second, 50*time.Millisecond, fmt.Sprintf("node %d did not register in time", other))

require.NoError(t, cluster.nodes[other].hare.Handler(ctx, cluster.nodes[sender].peerId(), msg))
}
return nil
}

for _, other := range cluster.nodes {
if err := other.hare.Handler(ctx, n.peerId(), msg); err != nil {
panic(err)
}
require.NoError(t, other.hare.Handler(ctx, n.peerId(), msg))
}
return nil
}).
AnyTimes()
n.mockStreamRequester.EXPECT().StreamRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(ctx context.Context, p p2p.Peer, msg []byte, cb server.StreamRequestCallback, _ ...string) error {

n.mockStreamRequester.EXPECT().StreamRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, p p2p.Peer, msg []byte, cb server.StreamRequestCallback, _ ...string) error {
for _, other := range cluster.nodes {
if other.peerId() == p {
b := make([]byte, 0, 1024)
Expand All @@ -1245,8 +1221,8 @@ func TestHare_ReconstructForward(t *testing.T) {
}
}
return nil
},
).AnyTimes()
}).
AnyTimes()
}

cluster.genProposals(layer, 2)
Expand Down

0 comments on commit a4f3e85

Please sign in to comment.