Skip to content

Commit

Permalink
fix(dot/sync): Gossip BlockAnnounceMessage only after successfully …
Browse files Browse the repository at this point in the history
…imported (#2885)

Co-authored-by: Timothy Wu <timwu20@gmail.com>
  • Loading branch information
EclesioMeloJunior and timwu20 authored Oct 21, 2022
1 parent 405db51 commit 69031a6
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 40 deletions.
45 changes: 34 additions & 11 deletions dot/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,40 +127,63 @@ func (s *Service) StorageRoot() (common.Hash, error) {

// HandleBlockImport handles a block that was imported via the network
func (s *Service) HandleBlockImport(block *types.Block, state *rtstorage.TrieState) error {
return s.handleBlock(block, state)
err := s.handleBlock(block, state)
if err != nil {
return fmt.Errorf("handling block: %w", err)
}

bestBlockHash := s.blockState.BestBlockHash()
isBestBlock := bestBlockHash.Equal(block.Header.Hash())

blockAnnounce, err := createBlockAnnounce(block, isBestBlock)
if err != nil {
return fmt.Errorf("creating block announce: %w", err)
}

s.net.GossipMessage(blockAnnounce)
return nil
}

// HandleBlockProduced handles a block that was produced by us
// It is handled the same as an imported block in terms of state updates; the only difference
// is we send a BlockAnnounceMessage to our peers.
func (s *Service) HandleBlockProduced(block *types.Block, state *rtstorage.TrieState) error {
if err := s.handleBlock(block, state); err != nil {
return err
err := s.handleBlock(block, state)
if err != nil {
return fmt.Errorf("handling block: %w", err)
}

blockAnnounce, err := createBlockAnnounce(block, true)
if err != nil {
return fmt.Errorf("creating block announce: %w", err)
}

s.net.GossipMessage(blockAnnounce)
return nil
}

func createBlockAnnounce(block *types.Block, isBestBlock bool) (
blockAnnounce *network.BlockAnnounceMessage, err error) {
digest := types.NewDigest()
for i := range block.Header.Digest.Types {
digestValue, err := block.Header.Digest.Types[i].Value()
if err != nil {
return fmt.Errorf("getting value of digest type at index %d: %w", i, err)
return nil, fmt.Errorf("getting value of digest type at index %d: %w", i, err)
}
err = digest.Add(digestValue)
if err != nil {
return err
return nil, fmt.Errorf("adding digest value for type at index %d: %w", i, err)
}
}

msg := &network.BlockAnnounceMessage{
return &network.BlockAnnounceMessage{
ParentHash: block.Header.ParentHash,
Number: block.Header.Number,
StateRoot: block.Header.StateRoot,
ExtrinsicsRoot: block.Header.ExtrinsicsRoot,
Digest: digest,
BestBlock: true,
}

s.net.GossipMessage(msg)
return nil
BestBlock: isBestBlock,
}, nil
}

func (s *Service) handleBlock(block *types.Block, state *rtstorage.TrieState) error {
Expand Down
4 changes: 2 additions & 2 deletions dot/core/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,9 @@ func Test_Service_HandleBlockProduced(t *testing.T) {
t.Parallel()
execTest := func(t *testing.T, s *Service, block *types.Block, trieState *rtstorage.TrieState, expErr error) {
err := s.HandleBlockProduced(block, trieState)
assert.ErrorIs(t, err, expErr)
require.ErrorIs(t, err, expErr)
if expErr != nil {
assert.EqualError(t, err, expErr.Error())
assert.EqualError(t, err, "handling block: "+expErr.Error())
}
}
t.Run("nil input", func(t *testing.T) {
Expand Down
8 changes: 5 additions & 3 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"

Expand Down Expand Up @@ -208,9 +209,10 @@ func (s *Service) handleBlockAnnounceMessage(from peer.ID, msg NotificationsMess
return false, errors.New("invalid message")
}

if err = s.syncer.HandleBlockAnnounce(from, bam); err != nil {
return false, err
err = s.syncer.HandleBlockAnnounce(from, bam)
if errors.Is(err, blocktree.ErrBlockExists) {
return true, nil
}

return true, nil
return false, err
}
62 changes: 48 additions & 14 deletions dot/network/block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"testing"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"
gomock "github.com/golang/mock/gomock"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -126,24 +128,56 @@ func TestDecodeBlockAnnounceHandshake(t *testing.T) {
func TestHandleBlockAnnounceMessage(t *testing.T) {
t.Parallel()

config := &Config{
BasePath: t.TempDir(),
Port: availablePort(t),
NoBootstrap: true,
NoMDNS: true,
testCases := map[string]struct {
propagate bool
mockSyncer func(*testing.T, peer.ID, *BlockAnnounceMessage) Syncer
}{
"block already exists": {
mockSyncer: func(t *testing.T, peer peer.ID, blockAnnounceMessage *BlockAnnounceMessage) Syncer {
ctrl := gomock.NewController(t)
syncer := NewMockSyncer(ctrl)
syncer.EXPECT().
HandleBlockAnnounce(peer, blockAnnounceMessage).
Return(blocktree.ErrBlockExists)
return syncer
},
propagate: true,
},
"block does not exists": {
propagate: false,
},
}

s := createTestService(t, config)
for tname, tt := range testCases {
tt := tt

peerID := peer.ID("noot")
msg := &BlockAnnounceMessage{
Number: 10,
Digest: types.NewDigest(),
}
t.Run(tname, func(t *testing.T) {
t.Parallel()

propagate, err := s.handleBlockAnnounceMessage(peerID, msg)
require.NoError(t, err)
require.True(t, propagate)
config := &Config{
BasePath: t.TempDir(),
Port: availablePort(t),
NoBootstrap: true,
NoMDNS: true,
}

peerID := peer.ID("noot")
msg := &BlockAnnounceMessage{
Number: 10,
Digest: types.NewDigest(),
}

if tt.mockSyncer != nil {
config.Syncer = tt.mockSyncer(t, peerID, msg)
}

service := createTestService(t, config)
gotPropagate, err := service.handleBlockAnnounceMessage(peerID, msg)

require.NoError(t, err)
require.Equal(t, tt.propagate, gotPropagate)
})
}
}

func TestValidateBlockAnnounceHandshake(t *testing.T) {
Expand Down
13 changes: 7 additions & 6 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/common/variadic"
)
Expand Down Expand Up @@ -251,7 +252,7 @@ func (cs *chainSync) setBlockAnnounce(from peer.ID, header *types.Header) error
}

if has {
return nil
return blocktree.ErrBlockExists
}

if err = cs.pendingBlocks.addHeader(header); err != nil {
Expand Down Expand Up @@ -627,19 +628,19 @@ func (cs *chainSync) tryDispatchWorker(w *worker) {
// if it fails due to any reason, it sets the worker `err` and returns
// this function always places the worker into the `resultCh` for result handling upon return
func (cs *chainSync) dispatchWorker(w *worker) {
if w.targetNumber == nil || w.startNumber == nil {
return
}

logger.Debugf("dispatching sync worker id %d, "+
"start number %d, target number %d, "+
"start hash %s, target hash %s, "+
"request data %d, direction %s",
w.id,
w.startNumber, w.targetNumber,
*w.startNumber, *w.targetNumber,
w.startHash, w.targetHash,
w.requestData, w.direction)

if w.targetNumber == nil || w.startNumber == nil {
return
}

start := time.Now()
defer func() {
end := time.Now()
Expand Down
78 changes: 74 additions & 4 deletions dot/sync/chain_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/common/variadic"
"github.com/ChainSafe/gossamer/lib/trie"
Expand Down Expand Up @@ -1247,20 +1248,23 @@ func Test_chainSync_start(t *testing.T) {
}

func Test_chainSync_setBlockAnnounce(t *testing.T) {
t.Parallel()

type args struct {
from peer.ID
header *types.Header
}
tests := map[string]struct {
chainSyncBuilder func(ctrl *gomock.Controller) chainSync
chainSyncBuilder func(*types.Header, *gomock.Controller) chainSync
args args
wantErr error
}{
"base case": {
wantErr: blocktree.ErrBlockExists,
args: args{
header: &types.Header{Number: 2},
},
chainSyncBuilder: func(ctrl *gomock.Controller) chainSync {
chainSyncBuilder: func(_ *types.Header, ctrl *gomock.Controller) chainSync {
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().HasHeader(common.MustHexToHash(
"0x05bdcc454f60a08d427d05e7f19f240fdc391f570ab76fcb96ecca0b5823d3bf")).Return(true, nil)
Expand All @@ -1271,13 +1275,79 @@ func Test_chainSync_setBlockAnnounce(t *testing.T) {
}
},
},
"err_when_calling_has_header": {
wantErr: errors.New("checking header exists"),
args: args{
header: &types.Header{Number: 2},
},
chainSyncBuilder: func(_ *types.Header, ctrl *gomock.Controller) chainSync {
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().
HasHeader(common.MustHexToHash(
"0x05bdcc454f60a08d427d05e7f19f240fdc391f570ab76fcb96ecca0b5823d3bf")).
Return(false, errors.New("checking header exists"))
mockDisjointBlockSet := NewMockDisjointBlockSet(ctrl)
return chainSync{
blockState: mockBlockState,
pendingBlocks: mockDisjointBlockSet,
}
},
},
"adding_block_header_to_pending_blocks": {
args: args{
header: &types.Header{Number: 2},
},
chainSyncBuilder: func(expectedHeader *types.Header, ctrl *gomock.Controller) chainSync {
argumentHeaderHash := common.MustHexToHash(
"0x05bdcc454f60a08d427d05e7f19f240fdc391f570ab76fcb96ecca0b5823d3bf")

mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().
HasHeader(argumentHeaderHash).
Return(false, nil)

mockBlockState.EXPECT().
BestBlockHeader().
Return(&types.Header{Number: 1}, nil)

mockDisjointBlockSet := NewMockDisjointBlockSet(ctrl)
mockDisjointBlockSet.EXPECT().
addHeader(expectedHeader).
Return(nil)

mockDisjointBlockSet.EXPECT().
addHashAndNumber(argumentHeaderHash, uint(2)).
Return(nil)

return chainSync{
blockState: mockBlockState,
pendingBlocks: mockDisjointBlockSet,
peerState: make(map[peer.ID]*peerState),
// creating an buffered channel for this specific test
// since it will put a work on the queue and an unbufered channel
// will hang until we read on this channel and the goal is to
// put the work on the channel and don't block
workQueue: make(chan *peerState, 1),
}
},
},
}
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
sync := tt.chainSyncBuilder(ctrl)
sync := tt.chainSyncBuilder(tt.args.header, ctrl)
err := sync.setBlockAnnounce(tt.args.from, tt.args.header)
assert.ErrorIs(t, err, tt.wantErr)
if tt.wantErr != nil {
assert.EqualError(t, err, tt.wantErr.Error())
} else {
assert.NoError(t, err)
}

if sync.workQueue != nil {
assert.Equal(t, len(sync.workQueue), 1)
}
})
}
}
Expand Down

0 comments on commit 69031a6

Please sign in to comment.