Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fixing syncing stuck issue #720

Merged
merged 5 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion state/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type Facade interface {
PendingTx(id tx.ID) *tx.Tx
AddPendingTx(trx *tx.Tx) error
AddPendingTxAndBroadcast(trx *tx.Tx) error
MakeCommittedBlock(data []byte, height uint32, blockHash hash.Hash) *store.CommittedBlock
CommittedBlock(height uint32) *store.CommittedBlock
CommittedTx(id tx.ID) *store.CommittedTx
BlockHash(height uint32) hash.Hash
Expand Down
9 changes: 0 additions & 9 deletions state/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,6 @@ func (m *MockState) CommitteePower() int64 {
return m.TestCommittee.TotalPower()
}

func (m *MockState) MakeCommittedBlock(data []byte, height uint32, blockHash hash.Hash) *store.CommittedBlock {
return &store.CommittedBlock{
Store: m.TestStore,
Data: data,
BlockHash: blockHash,
Height: height,
}
}

func (m *MockState) CommittedBlock(height uint32) *store.CommittedBlock {
m.lk.RLock()
defer m.lk.RUnlock()
Expand Down
9 changes: 0 additions & 9 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,15 +614,6 @@ func (st *state) IsValidator(addr crypto.Address) bool {
return st.store.HasValidator(addr)
}

func (st *state) MakeCommittedBlock(data []byte, height uint32, blockHash hash.Hash) *store.CommittedBlock {
return &store.CommittedBlock{
Store: st.store,
Data: data,
BlockHash: blockHash,
Height: height,
}
}

func (st *state) CommittedBlock(height uint32) *store.CommittedBlock {
b, err := st.store.Block(height)
if err != nil {
Expand Down
13 changes: 0 additions & 13 deletions state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,16 +801,3 @@ func TestCalcFee(t *testing.T) {
assert.Error(t, err)
}
}

func TestMakeCommittedBlock(t *testing.T) {
td := setup(t)

data := td.RandBytes(128)
height := td.RandHeight()
rndHash := td.RandHash()
cb := td.state1.MakeCommittedBlock(data, height, rndHash)
assert.Equal(t, data, cb.Data)
assert.Equal(t, rndHash, cb.BlockHash)
assert.Equal(t, height, cb.Height)
assert.NotNil(t, cb.Store)
}
1 change: 1 addition & 0 deletions store/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (bs *blockStore) saveBlock(batch *leveldb.Batch, height uint32, block *bloc

pubKey := trx.PublicKey()
if pubKey != nil {
// TODO: improve my performance by caching public keys
if !bs.hasPublicKey(trx.Payload().Signer()) {
publicKeyKey := publicKeyKey(trx.Payload().Signer())
batch.Put(publicKeyKey, pubKey.Bytes())
Expand Down
8 changes: 4 additions & 4 deletions store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// TODO: How to undo or rollback at least for last 21 blocks

type CommittedBlock struct {
Store
store *store

BlockHash hash.Hash
Height uint32
Expand All @@ -36,7 +36,7 @@ func (s *CommittedBlock) ToBlock() (*block.Block, error) {
for i := 0; i < trxs.Len(); i++ {
trx := trxs[i]
if trx.IsPublicKeyStriped() {
pub, err := s.PublicKey(trx.Payload().Signer())
pub, err := s.store.PublicKey(trx.Payload().Signer())
if err != nil {
return nil, PublicKeyNotFoundError{
Address: trx.Payload().Signer(),
Expand All @@ -50,7 +50,7 @@ func (s *CommittedBlock) ToBlock() (*block.Block, error) {
}

type CommittedTx struct {
Store
store *store

TxID tx.ID
Height uint32
Expand All @@ -65,7 +65,7 @@ func (s *CommittedTx) ToTx() (*tx.Tx, error) {
}

if trx.IsPublicKeyStriped() {
pub, err := s.PublicKey(trx.Payload().Signer())
pub, err := s.store.PublicKey(trx.Payload().Signer())
if err != nil {
return nil, PublicKeyNotFoundError{
Address: trx.Payload().Signer(),
Expand Down
4 changes: 2 additions & 2 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *store) Block(height uint32) (*CommittedBlock, error) {
}

return &CommittedBlock{
Store: s,
store: s,
BlockHash: blockHash,
Height: height,
Data: data[hash.HashSize:],
Expand Down Expand Up @@ -187,7 +187,7 @@ func (s *store) Transaction(id tx.ID) (*CommittedTx, error) {
blockTime := util.SliceToUint32(data[hash.HashSize+1 : hash.HashSize+5])

return &CommittedTx{
Store: s,
store: s,
TxID: id,
Height: pos.height,
BlockTime: blockTime,
Expand Down
98 changes: 74 additions & 24 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package store
import (
"testing"

"github.com/pactus-project/pactus/crypto/bls"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/types/tx"
"github.com/pactus-project/pactus/util"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -89,10 +92,10 @@ func TestWriteAndClosePeacefully(t *testing.T) {
func TestRetrieveBlockAndTransactions(t *testing.T) {
td := setup(t)

height, _ := td.store.LastCertificate()
committedBlock, err := td.store.Block(height)
lastHeight, _ := td.store.LastCertificate()
committedBlock, err := td.store.Block(lastHeight)
assert.NoError(t, err)
assert.Equal(t, height, committedBlock.Height)
assert.Equal(t, lastHeight, committedBlock.Height)
block, _ := committedBlock.ToBlock()
for _, trx := range block.Transactions() {
committedTx, err := td.store.Transaction(trx.ID())
Expand Down Expand Up @@ -127,35 +130,82 @@ func TestIndexingPublicKeys(t *testing.T) {
assert.Nil(t, pub)
}

func TestCommittedBlockToBlock(t *testing.T) {
func TestStrippedPublicKey(t *testing.T) {
td := setup(t)

// Use a tricky way to save transactions from the first block again.
// Find a public key that we have already indexed in the database.
committedBlock1, _ := td.store.Block(1)
committedBlock2, _ := td.store.Block(2)
blk1, _ := committedBlock1.ToBlock()
blk2, _ := committedBlock2.ToBlock()
td.store.SaveBlock(11, blk1, blk2.PrevCertificate())
err := td.store.WriteBatch()
assert.NoError(t, err)
trx0PubKey := blk1.Transactions()[0].PublicKey()
assert.NotNil(t, trx0PubKey)
knownPubKey := trx0PubKey.(*bls.PublicKey)

// Ensure that the committed block can obtain the public key.
committedBlock11, err := td.store.Block(11)
assert.NoError(t, err)
lastHeight, _ := td.store.LastCertificate()
lockTime := lastHeight
randPubkey, _ := td.RandBLSKeyPair()

trx0 := tx.NewTransferTx(lockTime, knownPubKey.AccountAddress(), td.RandAccAddress(), 1, 1, "")
trx1 := tx.NewTransferTx(lockTime, randPubkey.AccountAddress(), td.RandAccAddress(), 1, 1, "")
trx2 := tx.NewTransferTx(lockTime, randPubkey.AccountAddress(), td.RandAccAddress(), 1, 1, "")

trx0.SetSignature(td.RandBLSSignature())
trx1.SetSignature(td.RandBLSSignature())
trx2.SetSignature(td.RandBLSSignature())

trx0.StripPublicKey()
trx1.SetPublicKey(randPubkey)
trx2.StripPublicKey()

tests := []struct {
trx *tx.Tx
failed bool
}{
{trx0, false}, // indexed public key and stripped
{trx1, false}, // not stripped
{trx2, true}, // unknown public key and stripped
}

blk11, err := committedBlock11.ToBlock()
assert.NoError(t, err)
for _, test := range tests {
trxs := block.Txs{test.trx}

err = blk11.BasicCheck()
assert.NoError(t, err)
// Make a block
prevCert := td.GenerateTestCertificate()
blk := block.MakeBlock(1, util.Now(), trxs, td.RandHash(), td.RandHash(),
prevCert, td.RandSeed(), td.RandValAddress())

// Ensure that the committed transactions can obtain the public key.
committedTrx, err := td.store.Transaction(blk11.Transactions()[0].ID())
assert.NoError(t, err)
trxData, _ := test.trx.Bytes()
blkData, _ := blk.Bytes()

trx, err := committedTrx.ToTx()
assert.NoError(t, err)
committedTrx := CommittedTx{
store: td.store,
TxID: test.trx.ID(),
Height: lastHeight + 1,
Data: trxData,
}
committedBlock := CommittedBlock{
store: td.store,
BlockHash: blk.Hash(),
Height: lastHeight + 1,
Data: blkData,
}

err = trx.BasicCheck()
assert.NoError(t, err)
//
if test.failed {
_, err := committedBlock.ToBlock()
assert.ErrorIs(t, err, PublicKeyNotFoundError{
Address: test.trx.Payload().Signer(),
})

_, err = committedTrx.ToTx()
assert.ErrorIs(t, err, PublicKeyNotFoundError{
Address: test.trx.Payload().Signer(),
})
} else {
_, err := committedBlock.ToBlock()
assert.NoError(t, err)

_, err = committedTrx.ToTx()
assert.NoError(t, err)
}
}
}
6 changes: 5 additions & 1 deletion sync/handler_block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func (handler *blockAnnounceHandler) ParseMessage(m message.Message, initiator p

handler.cache.AddCertificate(msg.Height, msg.Certificate)
handler.cache.AddBlock(msg.Height, msg.Block)
handler.tryCommitBlocks()

err := handler.tryCommitBlocks()
if err != nil {
return err
}
handler.moveConsensusToNewHeight()

handler.peerSet.UpdateHeight(initiator, msg.Height, msg.Block.Hash())
Expand Down
14 changes: 14 additions & 0 deletions sync/handler_block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/sync/services"
"github.com/pactus-project/pactus/types/certificate"
"github.com/pactus-project/pactus/util"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -46,6 +47,19 @@ func TestParsingBlockAnnounceMessages(t *testing.T) {
})
}

func TestInvalidBlockAnnounce(t *testing.T) {
td := setup(t, nil)

pid := td.RandPeerID()
lastBlockHeight := td.state.LastBlockHeight()
blk := td.GenerateTestBlock()
invCert := certificate.NewCertificate(td.RandHeight(), 0, nil, nil, nil)
msg := message.NewBlockAnnounceMessage(lastBlockHeight+1, blk, invCert)

err := td.receivingNewMessage(td.sync, msg, pid)
assert.Error(t, err)
}

func TestBroadcastingBlockAnnounceMessages(t *testing.T) {
td := setup(t, nil)

Expand Down
15 changes: 7 additions & 8 deletions sync/handler_blocks_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package sync

import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/sync/bundle"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/types/block"
)

type blocksResponseHandler struct {
Expand All @@ -26,19 +26,18 @@ func (handler *blocksResponseHandler) ParseMessage(m message.Message, initiator
} else {
height := msg.From
for _, data := range msg.CommittedBlocksData {
committedBlock := handler.state.MakeCommittedBlock(data, height, hash.UndefHash)
b, err := committedBlock.ToBlock()
blk, err := block.FromBytes(data)
if err != nil {
return err
}
if err := b.BasicCheck(); err != nil {
return err
}
handler.cache.AddBlock(height, b)
handler.cache.AddBlock(height, blk)
height++
}
handler.cache.AddCertificate(msg.From, msg.LastCertificate)
handler.tryCommitBlocks()
err := handler.tryCommitBlocks()
if err != nil {
return err
}
}

handler.updateSession(msg.SessionID, initiator, msg.ResponseCode)
Expand Down
20 changes: 10 additions & 10 deletions sync/handler_blocks_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ func TestInvalidBlockData(t *testing.T) {
for _, test := range tests {
pid := td.RandPeerID()
sid := td.sync.peerSet.OpenSession(pid).SessionID()
cert := td.GenerateTestCertificate()
msg := message.NewBlocksResponseMessage(message.ResponseCodeMoreBlocks, message.ResponseCodeMoreBlocks.String(), sid,
td.RandHeight(), [][]byte{test.data}, nil)
td.state.LastBlockHeight()+1, [][]byte{test.data}, cert)

err := td.receivingNewMessage(td.sync, msg, pid)
assert.ErrorIs(t, err, test.err)
Expand Down Expand Up @@ -99,9 +100,7 @@ func TestStrippedPublicKey(t *testing.T) {
}{
{
blk1,
store.PublicKeyNotFoundError{
Address: blk1.Transactions()[0].Payload().Signer(),
},
store.ErrNotFound,
},
{
blk2,
Expand All @@ -111,12 +110,13 @@ func TestStrippedPublicKey(t *testing.T) {

for _, test := range tests {
assert.NoError(t, test.blk.BasicCheck())
trx1 := test.blk.Transactions()[0]
trx1.StripPublicKey()
d1, _ := test.blk.Bytes()
trx0 := test.blk.Transactions()[0]
trx0.StripPublicKey()
cert := td.GenerateTestCertificate()
blkData, _ := test.blk.Bytes()
sid := td.sync.peerSet.OpenSession(pid).SessionID()
msg := message.NewBlocksResponseMessage(message.ResponseCodeMoreBlocks, message.ResponseCodeRejected.String(), sid,
td.RandHeight(), [][]byte{d1}, nil)
td.state.LastBlockHeight()+1, [][]byte{blkData}, cert)
err := td.receivingNewMessage(td.sync, msg, pid)

assert.ErrorIs(t, err, test.err)
Expand Down Expand Up @@ -245,10 +245,10 @@ func TestSyncing(t *testing.T) {
shouldPublishMessageWithThisType(t, networkAlice, message.TypeBlocksRequest)
shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // 93-100
bdl := shouldPublishMessageWithThisType(t, networkBob, message.TypeBlocksResponse) // Synced

assert.Equal(t, bdl.Message.(*message.BlocksResponseMessage).ResponseCode, message.ResponseCodeSynced)

// Alice needs more time to process all the bundles,
// but the block height should be greater than zero
assert.Greater(t, syncAlice.state.LastBlockHeight(), uint32(0))
assert.Greater(t, syncAlice.state.LastBlockHeight(), uint32(20))
assert.Equal(t, syncBob.state.LastBlockHeight(), uint32(100))
}
Loading
Loading