Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): adding sequence number to the bundle #881

Merged
merged 2 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
conf *Config, log *logger.SubLogger,
) *gossipService {
opts := []lp2pps.Option{
lp2pps.WithFloodPublish(true),
lp2pps.WithMessageSignaturePolicy(lp2pps.StrictNoSign),
lp2pps.WithNoAuthor(),
lp2pps.WithMessageIdFn(MessageIDFunc),
Expand All @@ -35,15 +36,17 @@
opts = append(opts, lp2pps.WithPeerExchange(true))
}

gsParams := lp2pps.DefaultGossipSubParams()
if conf.IsGossiper {
// turn off the mesh in gossiper mode
lp2pps.GossipSubD = 0
lp2pps.GossipSubDscore = 0
lp2pps.GossipSubDlo = 0
lp2pps.GossipSubDhi = 0
lp2pps.GossipSubDout = 0
lp2pps.GossipSubDlazy = conf.ScaledMinConns()
gsParams.D = 0
gsParams.Dscore = 0
gsParams.Dlo = 0
gsParams.Dhi = 0
gsParams.Dout = 0
gsParams.Dlazy = conf.ScaledMinConns()

Check warning on line 47 in network/gossip.go

View check run for this annotation

Codecov / codecov/patch

network/gossip.go#L42-L47

Added lines #L42 - L47 were not covered by tests
}
opts = append(opts, lp2pps.WithGossipSubParams(gsParams))

pubsub, err := lp2pps.NewGossipSub(ctx, host, opts...)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions sync/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ const (
)

type Bundle struct {
Flags int
Message message.Message
Flags int
SequenceNo int
Message message.Message
}

func NewBundle(msg message.Message) *Bundle {
Expand All @@ -43,10 +44,15 @@ func (b *Bundle) CompressIt() {
b.Flags = util.SetFlag(b.Flags, BundleFlagCompressed)
}

func (b *Bundle) SetSequenceNo(seqNo int) {
b.SequenceNo = seqNo
}

type _Bundle struct {
Flags int `cbor:"1,keyasint"`
MessageType message.Type `cbor:"2,keyasint"`
MessageData []byte `cbor:"3,keyasint"`
SequenceNo int `cbor:"4,keyasint"`
}

func (b *Bundle) Encode() ([]byte, error) {
Expand All @@ -66,6 +72,7 @@ func (b *Bundle) Encode() ([]byte, error) {
Flags: b.Flags,
MessageType: b.Message.Type(),
MessageData: data,
SequenceNo: b.SequenceNo,
}

return cbor.Marshal(msg)
Expand Down Expand Up @@ -95,6 +102,7 @@ func (b *Bundle) Decode(r io.Reader) (int, error) {
}

b.Flags = bdl.Flags
b.SequenceNo = bdl.SequenceNo
b.Message = msg
return bytesRead, cbor.Unmarshal(data, msg)
}
13 changes: 11 additions & 2 deletions sync/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ func TestDecodeVoteCBOR(t *testing.T) {
"035879a101a70101" +
"02186403010458200264572d4d6bfcd2140d4f885fd5a32fe42fdbf40551e4ff89f3d235e32b4b92055501c0067d277f2dff" +
"99943016d6a0f379cf09846c6f06f60758308ab7aecbe03c4ed5b688bcb7e848baffa62bcbf1a4021522c56693f0a7bbcc1f" +
"e865277556ee59c1f63ba592acfe1b43")
"e865277556ee59c1f63ba592acfe1b43" +
"0401") // SequenceNo
d2, _ := hex.DecodeString(
"a3" +
"01190100" + // flags: 0x0100 (compressed)
"0207" + // Type (vote)
"0358951f8b08" +
"000000000000ff00790086ffa101a7010102186403010458200264572d4d6bfcd2140d4f885fd5a32fe42fdbf40551e4ff89" +
"f3d235e32b4b92055501c0067d277f2dff99943016d6a0f379cf09846c6f06f60758308ab7aecbe03c4ed5b688bcb7e848ba" +
"ffa62bcbf1a4021522c56693f0a7bbcc1fe865277556ee59c1f63ba592acfe1b43010000ffff798ce7ec79000000")
"ffa62bcbf1a4021522c56693f0a7bbcc1fe865277556ee59c1f63ba592acfe1b43010000ffff798ce7ec79000000" +
"0401") // SequenceNo

bdl1 := new(Bundle)
bdl2 := new(Bundle)
Expand All @@ -102,3 +104,10 @@ func TestDecodeVoteCBOR(t *testing.T) {
assert.Equal(t, bdl1.Message, bdl2.Message)
assert.Contains(t, bdl1.String(), "vote")
}

func TestSetSequenceNo(t *testing.T) {
bdl := new(Bundle)
bdl.SetSequenceNo(1001)

assert.Equal(t, 1001, bdl.SequenceNo)
}
11 changes: 10 additions & 1 deletion sync/peerset/peer_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type PeerSet struct {
sessions map[int]*session.Session
nextSessionID int
sessionTimeout time.Duration
totalSentBundles int
totalSentBytes int64
totalReceivedBytes int64
sentBytes map[message.Type]int64
Expand Down Expand Up @@ -326,10 +327,11 @@ func (ps *PeerSet) IncreaseReceivedBytesCounter(pid peer.ID, msgType message.Typ
ps.receivedBytes[msgType] += c
}

func (ps *PeerSet) IncreaseSentBytesCounter(msgType message.Type, c int64, pid *peer.ID) {
func (ps *PeerSet) IncreaseSentCounters(msgType message.Type, c int64, pid *peer.ID) {
ps.lk.Lock()
defer ps.lk.Unlock()

ps.totalSentBundles++
ps.totalSentBytes += c
ps.sentBytes[msgType] += c

Expand All @@ -339,6 +341,13 @@ func (ps *PeerSet) IncreaseSentBytesCounter(msgType message.Type, c int64, pid *
}
}

func (ps *PeerSet) TotalSentBundles() int {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.totalSentBundles
}

func (ps *PeerSet) TotalSentBytes() int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()
Expand Down
5 changes: 3 additions & 2 deletions sync/peerset/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestPeerSet(t *testing.T) {
peerSet.IncreaseReceivedBundlesCounter(pid1)
peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeBlocksResponse, 100)
peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeTransactions, 150)
peerSet.IncreaseSentBytesCounter(message.TypeBlocksRequest, 200, nil)
peerSet.IncreaseSentBytesCounter(message.TypeBlocksRequest, 250, &pid1)
peerSet.IncreaseSentCounters(message.TypeBlocksRequest, 200, nil)
peerSet.IncreaseSentCounters(message.TypeBlocksRequest, 250, &pid1)

peer1 := peerSet.getPeer(pid1)

Expand All @@ -100,6 +100,7 @@ func TestPeerSet(t *testing.T) {
assert.Equal(t, peerSet.TotalSentBytes(), int64(450))
assert.Equal(t, peerSet.SentBytesMessageType(message.TypeBlocksRequest), int64(450))
assert.Equal(t, peerSet.SentBytes(), sentBytes)
assert.Equal(t, peerSet.TotalSentBundles(), 2)
})

t.Run("Testing UpdateHeight", func(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func (sync *synchronizer) prepareBundle(msg message.Message) *bundle.Bundle {
// It's localnet and for testing purpose only
}

bdl.SetSequenceNo(sync.peerSet.TotalSentBundles())

return bdl
}
return nil
Expand All @@ -167,7 +169,7 @@ func (sync *synchronizer) sendTo(msg message.Message, to peer.ID) {
}

sync.peerSet.UpdateLastSent(to)
sync.peerSet.IncreaseSentBytesCounter(msg.Type(), int64(len(data)), &to)
sync.peerSet.IncreaseSentCounters(msg.Type(), int64(len(data)), &to)
sync.logger.Info("bundle sent", "bundle", bdl, "to", to)
}
}
Expand All @@ -194,7 +196,7 @@ func (sync *synchronizer) broadcast(msg message.Message) {
} else {
sync.logger.Info("broadcasting new bundle", "bundle", bdl)
}
sync.peerSet.IncreaseSentBytesCounter(msg.Type(), int64(len(data)), nil)
sync.peerSet.IncreaseSentCounters(msg.Type(), int64(len(data)), nil)
}
}

Expand Down
23 changes: 19 additions & 4 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,20 +373,35 @@ func TestBroadcastBlockAnnounce(t *testing.T) {

t.Run("Should announce the block", func(t *testing.T) {
blk, cert := td.GenerateTestBlock(td.RandHeight())
msg := message.BlockAnnounceMessage{Block: blk, Certificate: cert}
msg := message.NewBlockAnnounceMessage(blk, cert)

td.broadcastCh <- &msg
td.sync.broadcast(msg)

td.shouldPublishMessageWithThisType(t, message.TypeBlockAnnounce)
})

t.Run("Should NOT announce the block", func(t *testing.T) {
blk, cert := td.GenerateTestBlock(td.RandHeight())
msg := message.BlockAnnounceMessage{Block: blk, Certificate: cert}
msg := message.NewBlockAnnounceMessage(blk, cert)

td.sync.cache.AddBlock(blk)
td.broadcastCh <- &msg
td.sync.broadcast(msg)

td.shouldNotPublishMessageWithThisType(t, message.TypeBlockAnnounce)
})
}

func TestBundleSequenceNo(t *testing.T) {
td := setup(t, nil)

msg := message.NewQueryProposalMessage(td.RandHeight(), td.RandValAddress())

td.sync.broadcast(msg)
bdl1 := td.shouldPublishMessageWithThisType(t, message.TypeQueryProposal)
assert.Equal(t, 0, bdl1.SequenceNo)

// Sending the same message again
td.sync.broadcast(msg)
bdl2 := td.shouldPublishMessageWithThisType(t, message.TypeQueryProposal)
assert.Equal(t, 1, bdl2.SequenceNo)
}
Loading