Skip to content

Commit

Permalink
fix(sync): adding sequence number to the bundle (#881)
Browse files Browse the repository at this point in the history
  • Loading branch information
themantre committed Dec 30, 2023
1 parent 2d9f92f commit 02d5692
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 19 deletions.
15 changes: 9 additions & 6 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func newGossipService(ctx context.Context, host lp2phost.Host, eventCh chan Even
conf *Config, log *logger.SubLogger,
) *gossipService {
opts := []lp2pps.Option{
lp2pps.WithFloodPublish(true),
lp2pps.WithMessageSignaturePolicy(lp2pps.StrictNoSign),
lp2pps.WithNoAuthor(),
lp2pps.WithMessageIdFn(MessageIDFunc),
Expand All @@ -35,15 +36,17 @@ func newGossipService(ctx context.Context, host lp2phost.Host, eventCh chan Even
opts = append(opts, lp2pps.WithPeerExchange(true))
}

gsParams := lp2pps.DefaultGossipSubParams()
if conf.IsGossiper {
// turn off the mesh in gossiper mode
lp2pps.GossipSubD = 0
lp2pps.GossipSubDscore = 0
lp2pps.GossipSubDlo = 0
lp2pps.GossipSubDhi = 0
lp2pps.GossipSubDout = 0
lp2pps.GossipSubDlazy = conf.ScaledMinConns()
gsParams.D = 0
gsParams.Dscore = 0
gsParams.Dlo = 0
gsParams.Dhi = 0
gsParams.Dout = 0
gsParams.Dlazy = conf.ScaledMinConns()
}
opts = append(opts, lp2pps.WithGossipSubParams(gsParams))

pubsub, err := lp2pps.NewGossipSub(ctx, host, opts...)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions sync/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ const (
)

type Bundle struct {
Flags int
Message message.Message
Flags int
SequenceNo int
Message message.Message
}

func NewBundle(msg message.Message) *Bundle {
Expand All @@ -43,10 +44,15 @@ func (b *Bundle) CompressIt() {
b.Flags = util.SetFlag(b.Flags, BundleFlagCompressed)
}

func (b *Bundle) SetSequenceNo(seqNo int) {
b.SequenceNo = seqNo
}

type _Bundle struct {
Flags int `cbor:"1,keyasint"`
MessageType message.Type `cbor:"2,keyasint"`
MessageData []byte `cbor:"3,keyasint"`
SequenceNo int `cbor:"4,keyasint"`
}

func (b *Bundle) Encode() ([]byte, error) {
Expand All @@ -66,6 +72,7 @@ func (b *Bundle) Encode() ([]byte, error) {
Flags: b.Flags,
MessageType: b.Message.Type(),
MessageData: data,
SequenceNo: b.SequenceNo,
}

return cbor.Marshal(msg)
Expand Down Expand Up @@ -95,6 +102,7 @@ func (b *Bundle) Decode(r io.Reader) (int, error) {
}

b.Flags = bdl.Flags
b.SequenceNo = bdl.SequenceNo
b.Message = msg
return bytesRead, cbor.Unmarshal(data, msg)
}
13 changes: 11 additions & 2 deletions sync/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ func TestDecodeVoteCBOR(t *testing.T) {
"035879a101a70101" +
"02186403010458200264572d4d6bfcd2140d4f885fd5a32fe42fdbf40551e4ff89f3d235e32b4b92055501c0067d277f2dff" +
"99943016d6a0f379cf09846c6f06f60758308ab7aecbe03c4ed5b688bcb7e848baffa62bcbf1a4021522c56693f0a7bbcc1f" +
"e865277556ee59c1f63ba592acfe1b43")
"e865277556ee59c1f63ba592acfe1b43" +
"0401") // SequenceNo
d2, _ := hex.DecodeString(
"a3" +
"01190100" + // flags: 0x0100 (compressed)
"0207" + // Type (vote)
"0358951f8b08" +
"000000000000ff00790086ffa101a7010102186403010458200264572d4d6bfcd2140d4f885fd5a32fe42fdbf40551e4ff89" +
"f3d235e32b4b92055501c0067d277f2dff99943016d6a0f379cf09846c6f06f60758308ab7aecbe03c4ed5b688bcb7e848ba" +
"ffa62bcbf1a4021522c56693f0a7bbcc1fe865277556ee59c1f63ba592acfe1b43010000ffff798ce7ec79000000")
"ffa62bcbf1a4021522c56693f0a7bbcc1fe865277556ee59c1f63ba592acfe1b43010000ffff798ce7ec79000000" +
"0401") // SequenceNo

bdl1 := new(Bundle)
bdl2 := new(Bundle)
Expand All @@ -102,3 +104,10 @@ func TestDecodeVoteCBOR(t *testing.T) {
assert.Equal(t, bdl1.Message, bdl2.Message)
assert.Contains(t, bdl1.String(), "vote")
}

func TestSetSequenceNo(t *testing.T) {
bdl := new(Bundle)
bdl.SetSequenceNo(1001)

assert.Equal(t, 1001, bdl.SequenceNo)
}
11 changes: 10 additions & 1 deletion sync/peerset/peer_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type PeerSet struct {
sessions map[int]*session.Session
nextSessionID int
sessionTimeout time.Duration
totalSentBundles int
totalSentBytes int64
totalReceivedBytes int64
sentBytes map[message.Type]int64
Expand Down Expand Up @@ -326,10 +327,11 @@ func (ps *PeerSet) IncreaseReceivedBytesCounter(pid peer.ID, msgType message.Typ
ps.receivedBytes[msgType] += c
}

func (ps *PeerSet) IncreaseSentBytesCounter(msgType message.Type, c int64, pid *peer.ID) {
func (ps *PeerSet) IncreaseSentCounters(msgType message.Type, c int64, pid *peer.ID) {
ps.lk.Lock()
defer ps.lk.Unlock()

ps.totalSentBundles++
ps.totalSentBytes += c
ps.sentBytes[msgType] += c

Expand All @@ -339,6 +341,13 @@ func (ps *PeerSet) IncreaseSentBytesCounter(msgType message.Type, c int64, pid *
}
}

func (ps *PeerSet) TotalSentBundles() int {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.totalSentBundles
}

func (ps *PeerSet) TotalSentBytes() int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()
Expand Down
5 changes: 3 additions & 2 deletions sync/peerset/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestPeerSet(t *testing.T) {
peerSet.IncreaseReceivedBundlesCounter(pid1)
peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeBlocksResponse, 100)
peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeTransactions, 150)
peerSet.IncreaseSentBytesCounter(message.TypeBlocksRequest, 200, nil)
peerSet.IncreaseSentBytesCounter(message.TypeBlocksRequest, 250, &pid1)
peerSet.IncreaseSentCounters(message.TypeBlocksRequest, 200, nil)
peerSet.IncreaseSentCounters(message.TypeBlocksRequest, 250, &pid1)

peer1 := peerSet.getPeer(pid1)

Expand All @@ -100,6 +100,7 @@ func TestPeerSet(t *testing.T) {
assert.Equal(t, peerSet.TotalSentBytes(), int64(450))
assert.Equal(t, peerSet.SentBytesMessageType(message.TypeBlocksRequest), int64(450))
assert.Equal(t, peerSet.SentBytes(), sentBytes)
assert.Equal(t, peerSet.TotalSentBundles(), 2)
})

t.Run("Testing UpdateHeight", func(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func (sync *synchronizer) prepareBundle(msg message.Message) *bundle.Bundle {
// It's localnet and for testing purpose only
}

bdl.SetSequenceNo(sync.peerSet.TotalSentBundles())

return bdl
}
return nil
Expand All @@ -167,7 +169,7 @@ func (sync *synchronizer) sendTo(msg message.Message, to peer.ID) {
}

sync.peerSet.UpdateLastSent(to)
sync.peerSet.IncreaseSentBytesCounter(msg.Type(), int64(len(data)), &to)
sync.peerSet.IncreaseSentCounters(msg.Type(), int64(len(data)), &to)
sync.logger.Info("bundle sent", "bundle", bdl, "to", to)
}
}
Expand All @@ -194,7 +196,7 @@ func (sync *synchronizer) broadcast(msg message.Message) {
} else {
sync.logger.Info("broadcasting new bundle", "bundle", bdl)
}
sync.peerSet.IncreaseSentBytesCounter(msg.Type(), int64(len(data)), nil)
sync.peerSet.IncreaseSentCounters(msg.Type(), int64(len(data)), nil)
}
}

Expand Down
23 changes: 19 additions & 4 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,20 +373,35 @@ func TestBroadcastBlockAnnounce(t *testing.T) {

t.Run("Should announce the block", func(t *testing.T) {
blk, cert := td.GenerateTestBlock(td.RandHeight())
msg := message.BlockAnnounceMessage{Block: blk, Certificate: cert}
msg := message.NewBlockAnnounceMessage(blk, cert)

td.broadcastCh <- &msg
td.sync.broadcast(msg)

td.shouldPublishMessageWithThisType(t, message.TypeBlockAnnounce)
})

t.Run("Should NOT announce the block", func(t *testing.T) {
blk, cert := td.GenerateTestBlock(td.RandHeight())
msg := message.BlockAnnounceMessage{Block: blk, Certificate: cert}
msg := message.NewBlockAnnounceMessage(blk, cert)

td.sync.cache.AddBlock(blk)
td.broadcastCh <- &msg
td.sync.broadcast(msg)

td.shouldNotPublishMessageWithThisType(t, message.TypeBlockAnnounce)
})
}

func TestBundleSequenceNo(t *testing.T) {
td := setup(t, nil)

msg := message.NewQueryProposalMessage(td.RandHeight(), td.RandValAddress())

td.sync.broadcast(msg)
bdl1 := td.shouldPublishMessageWithThisType(t, message.TypeQueryProposal)
assert.Equal(t, 0, bdl1.SequenceNo)

// Sending the same message again
td.sync.broadcast(msg)
bdl2 := td.shouldPublishMessageWithThisType(t, message.TypeQueryProposal)
assert.Equal(t, 1, bdl2.SequenceNo)
}

0 comments on commit 02d5692

Please sign in to comment.