Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): ignore block request if blocks are already inside the cache #817

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 219 additions & 41 deletions sync/handler_blocks_response_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package sync

import (
"fmt"
"io"
"testing"
"time"

"github.com/pactus-project/pactus/consensus"
"github.com/pactus-project/pactus/crypto/bls"
"github.com/pactus-project/pactus/network"
"github.com/pactus-project/pactus/state"
"github.com/pactus-project/pactus/store"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/sync/peerset"
"github.com/pactus-project/pactus/sync/peerset/service"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/types/tx"
"github.com/pactus-project/pactus/util"
"github.com/pactus-project/pactus/util/logger"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -139,65 +146,236 @@ func TestStrippedPublicKey(t *testing.T) {
}
}

func shouldPublishBlockRequest(t *testing.T, net *network.MockNetwork, from uint32) {
t.Helper()

bdl := shouldPublishMessageWithThisType(t, net, message.TypeBlocksRequest)
msg := bdl.Message.(*message.BlocksRequestMessage)
require.Equal(t, from, msg.From)
}

func shouldPublishBlockResponse(t *testing.T, net *network.MockNetwork,
from, count uint32, code message.ResponseCode,
) {
t.Helper()

bdl := shouldPublishMessageWithThisType(t, net, message.TypeBlocksResponse)
msg := bdl.Message.(*message.BlocksResponseMessage)
require.Equal(t, from, msg.From)
require.Equal(t, count, msg.Count())
require.Equal(t, code, msg.ResponseCode)
}

type networkAliceBob struct {
*testsuite.TestSuite

stateAlice *state.MockState
stateBob *state.MockState
networkAlice *network.MockNetwork
networkBob *network.MockNetwork
syncAlice *synchronizer
syncBob *synchronizer
}

func makeAliceAndBobNetworks(t *testing.T) *networkAliceBob {
t.Helper()

ts := testsuite.NewTestSuite(t)

configAlice := testConfig()
configBob := testConfig()

valKeyAlice := []*bls.ValidatorKey{ts.RandValKey()}
valKeyBob := []*bls.ValidatorKey{ts.RandValKey()}
stateAlice := state.MockingState(ts)
stateBob := state.MockingState(ts)
consMgrAlice, _ := consensus.MockingManager(ts, valKeyAlice)
consMgrBob, _ := consensus.MockingManager(ts, valKeyBob)
internalMessageCh := make(chan message.Message, 1000)
networkAlice := network.MockingNetwork(ts, ts.RandPeerID())
networkBob := network.MockingNetwork(ts, ts.RandPeerID())

configBob.NodeNetwork = true
networkAlice.AddAnotherNetwork(networkBob)
networkBob.AddAnotherNetwork(networkAlice)

sync1, err := NewSynchronizer(configAlice,
valKeyAlice,
stateAlice,
consMgrAlice,
networkAlice,
internalMessageCh,
)
assert.NoError(t, err)
syncAlice := sync1.(*synchronizer)

sync2, err := NewSynchronizer(configBob,
valKeyBob,
stateBob,
consMgrBob,
networkBob,
internalMessageCh,
)
assert.NoError(t, err)
syncBob := sync2.(*synchronizer)

// -------------------------------
// Better logging during testing
overrideLogger := func(sync *synchronizer, name string) {
sync.logger = logger.NewSubLogger("_sync",
testsuite.NewOverrideStringer(fmt.Sprintf("%s - %s: ", name, t.Name()), sync))
}

overrideLogger(syncAlice, "Alice")
overrideLogger(syncBob, "Bob")
// -------------------------------

assert.NoError(t, syncAlice.Start())
assert.NoError(t, syncBob.Start())

// Verify that Hello messages are exchanged between Alice and Bob
assert.NoError(t, syncAlice.sayHello(syncBob.SelfID()))
assert.NoError(t, syncBob.sayHello(syncAlice.SelfID()))

shouldPublishMessageWithThisType(t, networkAlice, message.TypeHello)
shouldPublishMessageWithThisType(t, networkBob, message.TypeHello)

shouldPublishMessageWithThisType(t, networkBob, message.TypeHelloAck)
shouldPublishMessageWithThisType(t, networkAlice, message.TypeHelloAck)

// Ensure peers are connected and block heights are correct
require.Eventually(t, func() bool {
return syncAlice.PeerSet().Len() == 1 &&
syncBob.PeerSet().Len() == 1
}, time.Second, 100*time.Millisecond)

require.Equal(t, peerset.StatusCodeKnown, syncAlice.PeerSet().GetPeer(syncBob.SelfID()).Status)
require.Equal(t, peerset.StatusCodeKnown, syncBob.PeerSet().GetPeer(syncAlice.SelfID()).Status)

return &networkAliceBob{
TestSuite: ts,
syncAlice: syncAlice,
stateAlice: stateAlice,
networkAlice: networkAlice,
syncBob: syncBob,
stateBob: stateBob,
networkBob: networkBob,
}
}

// TestIdenticalBundles tests if two different peers publish the same message,
// whether the bundle data is also the same.
func TestIdenticalBundles(t *testing.T) {
td := makeAliceAndBobNetworks(t)

blk, cert := td.GenerateTestBlock(td.RandHeight())
msg := message.NewBlockAnnounceMessage(blk, cert)

bdlAlice := td.syncAlice.prepareBundle(msg)
bdlBob := td.syncBob.prepareBundle(msg)

assert.Equal(t, bdlAlice, bdlBob)
}

// TestSyncing is an important test to verify the syncing process between two
// test nodes, Alice and Bob. In real-world scenarios, multiple nodes are typically
// involved, but the procedure remains similar.
func TestSyncing(t *testing.T) {
ts, syncAlice, networkAlice, syncBob, networkBob := makeAliceAndBobNetworks(t)
td := makeAliceAndBobNetworks(t)

// Adding 100 blocks for Bob
blockInterval := syncBob.state.Genesis().Params().BlockInterval()
blockTime := util.RoundNow(int(blockInterval.Seconds()))
blockInterval := td.syncBob.state.Genesis().Params().BlockInterval()
blockTime := td.syncBob.state.Genesis().GenesisTime()
for i := uint32(0); i < 100; i++ {
blk, cert := ts.GenerateTestBlockWithTime(i+1, blockTime)
assert.NoError(t, syncBob.state.CommitBlock(blk, cert))
blk, cert := td.GenerateTestBlockWithTime(i+1, blockTime)
assert.NoError(t, td.syncBob.state.CommitBlock(blk, cert))

blockTime = blockTime.Add(blockInterval)
}

assert.Equal(t, uint32(0), syncAlice.state.LastBlockHeight())
assert.Equal(t, uint32(100), syncBob.state.LastBlockHeight())
assert.Equal(t, uint32(0), td.syncAlice.state.LastBlockHeight())
assert.Equal(t, uint32(100), td.syncBob.state.LastBlockHeight())

// Announcing a block
blk, cert := ts.GenerateTestBlock(ts.RandHeight())
blk, cert := td.GenerateTestBlock(td.RandHeight())
msg := message.NewBlockAnnounceMessage(blk, cert)
syncBob.broadcast(msg)
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlockAnnounce)
td.syncBob.broadcast(msg)
shouldPublishMessageWithThisType(t, td.networkBob, message.TypeBlockAnnounce)

// Perform block syncing
shouldNotPublishMessageWithThisType(t, networkBob, message.TypeBlocksRequest)
shouldPublishMessageWithThisType(t, networkAlice, message.TypeBlocksRequest)
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 1-11
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 12-22
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 23-23
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // NoMoreBlock

shouldPublishMessageWithThisType(t, networkAlice, message.TypeBlocksRequest)
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 24-34
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 35-45
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 46-46
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // NoMoreBlock

shouldPublishMessageWithThisType(t, networkAlice, message.TypeBlocksRequest)
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 47-57
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 58-68
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 69-69
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // NoMoreBlock

shouldPublishMessageWithThisType(t, networkAlice, message.TypeBlocksRequest)
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 70-80
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 81-91
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 92-92
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // NoMoreBlock
assert.Equal(t, uint32(11), td.syncAlice.config.BlockPerMessage)
assert.Equal(t, uint32(23), td.syncAlice.config.LatestBlockInterval)

shouldNotPublishMessageWithThisType(t, td.networkBob, message.TypeBlocksRequest)
shouldPublishBlockRequest(t, td.networkAlice, 1)
shouldPublishBlockResponse(t, td.networkBob, 1, 11, message.ResponseCodeMoreBlocks) // 1-11
shouldPublishBlockResponse(t, td.networkBob, 12, 11, message.ResponseCodeMoreBlocks) // 12-22
shouldPublishBlockResponse(t, td.networkBob, 23, 1, message.ResponseCodeMoreBlocks) // 23-23
shouldPublishBlockResponse(t, td.networkBob, 0, 0, message.ResponseCodeNoMoreBlocks) // NoMoreBlock

shouldPublishBlockRequest(t, td.networkAlice, 24)
shouldPublishBlockResponse(t, td.networkBob, 24, 11, message.ResponseCodeMoreBlocks) // 24-34
shouldPublishBlockResponse(t, td.networkBob, 35, 11, message.ResponseCodeMoreBlocks) // 35-45
shouldPublishBlockResponse(t, td.networkBob, 46, 1, message.ResponseCodeMoreBlocks) // 46-46
shouldPublishBlockResponse(t, td.networkBob, 0, 0, message.ResponseCodeNoMoreBlocks) // NoMoreBlock

shouldPublishBlockRequest(t, td.networkAlice, 47)
shouldPublishBlockResponse(t, td.networkBob, 47, 11, message.ResponseCodeMoreBlocks) // 47-57
shouldPublishBlockResponse(t, td.networkBob, 58, 11, message.ResponseCodeMoreBlocks) // 58-68
shouldPublishBlockResponse(t, td.networkBob, 69, 1, message.ResponseCodeMoreBlocks) // 69-69
shouldPublishBlockResponse(t, td.networkBob, 0, 0, message.ResponseCodeNoMoreBlocks) // NoMoreBlock

shouldPublishBlockRequest(t, td.networkAlice, 70)
shouldPublishBlockResponse(t, td.networkBob, 70, 11, message.ResponseCodeMoreBlocks) // 70-80
shouldPublishBlockResponse(t, td.networkBob, 81, 11, message.ResponseCodeMoreBlocks) // 81-91
shouldPublishBlockResponse(t, td.networkBob, 92, 1, message.ResponseCodeMoreBlocks) // 92-92
shouldPublishBlockResponse(t, td.networkBob, 0, 0, message.ResponseCodeNoMoreBlocks) // NoMoreBlock

// Last block requests
shouldPublishMessageWithThisType(t, networkAlice, message.TypeBlocksRequest)
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 93-100
bdl := shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // Synced
assert.Equal(t, bdl.Message.(*message.BlocksResponseMessage).ResponseCode, message.ResponseCodeSynced)
shouldPublishBlockRequest(t, td.networkAlice, 93) // 93-116
shouldPublishBlockResponse(t, td.networkBob, 93, 8, message.ResponseCodeMoreBlocks) // 93-100
shouldPublishBlockResponse(t, td.networkBob, 100, 0, message.ResponseCodeSynced) // Synced

// Alice needs more time to process all the bundles,
// but the block height should be greater than zero
assert.Greater(t, syncAlice.state.LastBlockHeight(), uint32(20))
assert.Equal(t, syncBob.state.LastBlockHeight(), uint32(100))
assert.Greater(t, td.syncAlice.state.LastBlockHeight(), uint32(20))
assert.Equal(t, td.syncBob.state.LastBlockHeight(), uint32(100))
}

func TestSyncingHasBlockInCache(t *testing.T) {
td := makeAliceAndBobNetworks(t)

// Adding 100 blocks for Bob
blockInterval := td.syncBob.state.Genesis().Params().BlockInterval()
blockTime := td.syncBob.state.Genesis().GenesisTime()
for i := uint32(0); i < 23; i++ {
blk, cert := td.GenerateTestBlockWithTime(i+1, blockTime)
assert.NoError(t, td.syncBob.state.CommitBlock(blk, cert))

blockTime = blockTime.Add(blockInterval)
}

assert.Equal(t, uint32(0), td.syncAlice.state.LastBlockHeight())
assert.Equal(t, uint32(23), td.syncBob.state.LastBlockHeight())

// Adding some blocs to the cache
blk1 := td.stateBob.TestStore.Blocks[1]
blk2 := td.stateBob.TestStore.Blocks[2]
blk3 := td.stateBob.TestStore.Blocks[3]
td.syncAlice.cache.AddBlock(blk1)
td.syncAlice.cache.AddBlock(blk2)
td.syncAlice.cache.AddBlock(blk3)

// Announcing a block
blk, cert := td.GenerateTestBlock(td.RandHeight())
msg := message.NewBlockAnnounceMessage(blk, cert)
td.syncBob.broadcast(msg)
shouldPublishMessageWithThisType(t, td.networkBob, message.TypeBlockAnnounce)

shouldNotPublishMessageWithThisType(t, td.networkBob, message.TypeBlocksRequest)
// blocks 1-2 are inside the cache
shouldPublishBlockRequest(t, td.networkAlice, 4)
shouldPublishBlockResponse(t, td.networkBob, 4, 11, message.ResponseCodeMoreBlocks) // 4-14
shouldPublishBlockResponse(t, td.networkBob, 15, 9, message.ResponseCodeMoreBlocks) // 15-23
shouldPublishBlockResponse(t, td.networkBob, 23, 0, message.ResponseCodeSynced) // Synced
}
7 changes: 3 additions & 4 deletions sync/peerset/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,11 @@ func TestOpenSession(t *testing.T) {
ps := NewPeerSet(time.Minute)

pid := peer.ID("peer1")
ssn := ps.OpenSession(pid, 100, 101)
ssn := ps.OpenSession(pid, 100, 1)

assert.NotNil(t, ssn)
from, to := ssn.Range()
assert.Equal(t, uint32(100), from)
assert.Equal(t, uint32(101), to)
assert.Equal(t, uint32(100), ssn.From)
assert.Equal(t, uint32(1), ssn.Count)
assert.Equal(t, pid, ssn.PeerID)
assert.Equal(t, session.Open, ssn.Status)
assert.LessOrEqual(t, ssn.StartedAt, time.Now())
Expand Down
10 changes: 3 additions & 7 deletions sync/peerset/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,17 @@ type Session struct {
Status Status
PeerID peer.ID
From uint32
To uint32
Count uint32
StartedAt time.Time
}

func NewSession(id int, peerID peer.ID, from, to uint32) *Session {
func NewSession(id int, peerID peer.ID, from, count uint32) *Session {
return &Session{
SessionID: id,
Status: Open,
PeerID: peerID,
From: from,
To: to,
Count: count,
StartedAt: util.Now(),
}
}

func (s *Session) Range() (uint32, uint32) {
return s.From, s.To
}
Loading
Loading