Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sync): refactoring syncing process #676

Merged
merged 4 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,10 @@
# `moniker` is a custom human-readable name for this node.
## moniker = ""

# `heartbeat_timer` is a timer for broadcasting a heartbeat message to the network.
# Default is 5 seconds
# Set the value to zero e.g heartbeat_timer = "0s" to disable heartbeat broadcasing.
## heartbeat_timer = "5s"

# `session_timeout` is a timeout for a session to be opened.
# Default is 10 seconds
## session_timeout = "10s"

# `max_open_sessions` is the maximum number of open sessions.
# Default is 8
## max_open_sessions = 8

# `block_per_message` is the number of blocks per message.
# Default is 60.
## block_per_message = 60
Expand Down
4 changes: 2 additions & 2 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func setupWithSeed(t *testing.T, seed int64) *testData {
params.CommitteeSize = 4

// to prevent triggering timers before starting the tests to avoid double entries for new heights in some tests.
getTime := util.RoundNow(params.BlockTimeInSecond).Add(time.Duration(params.BlockTimeInSecond) * time.Second)
getTime := util.RoundNow(params.BlockIntervalInSecond).Add(time.Duration(params.BlockIntervalInSecond) * time.Second)
genDoc := genesis.MakeGenesis(getTime, accs, vals, params)
stX, err := state.LoadOrNewState(genDoc, []crypto.Signer{signers[tIndexX]},
store.MockingStore(ts), txPool, nil)
Expand Down Expand Up @@ -138,7 +138,7 @@ func setupWithSeed(t *testing.T, seed int64) *testData {
broadcaster, newConcreteMediator())

// -------------------------------
// For better logging when testing
// Better logging during testing
overrideLogger := func(cons *consensus, name string) {
cons.logger = logger.NewSubLogger("_consensus",
&OverrideStringer{name: fmt.Sprintf("%s - %s: ", name, t.Name()), cons: cons})
Expand Down
2 changes: 1 addition & 1 deletion consensus/height.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type newHeightState struct {
}

func (s *newHeightState) enter() {
sleep := s.state.LastBlockTime().Add(s.state.BlockTime()).Sub(util.Now())
sleep := s.state.LastBlockTime().Add(s.state.Params().BlockInterval()).Sub(util.Now())
s.scheduleTimeout(sleep, s.height, s.round, tickerTargetNewHeight)
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func TestManager(t *testing.T) {
acc := account.NewAccount(0)
acc.AddToBalance(21 * 1e14)
params := param.DefaultParams()
params.BlockTimeInSecond = 1
params.BlockIntervalInSecond = 1
vals := make([]*validator.Validator, 5)
for i, s := range committeeSigners {
val := validator.NewValidator(s.PublicKey().(*bls.PublicKey), int32(i))
vals[i] = val
}
accs := map[crypto.Address]*account.Account{crypto.TreasuryAddress: acc}
// to prevent triggering timers before starting the tests to avoid double entries for new heights in some tests.
getTime := util.RoundNow(params.BlockTimeInSecond).Add(time.Duration(params.BlockTimeInSecond) * time.Second)
getTime := util.RoundNow(params.BlockIntervalInSecond).Add(time.Duration(params.BlockIntervalInSecond) * time.Second)
genDoc := genesis.MakeGenesis(getTime, accs, vals, params)

rewardAddrs := []crypto.Address{
Expand Down
2 changes: 1 addition & 1 deletion genesis/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMarshaling(t *testing.T) {
[]*validator.Validator{val}, param.DefaultParams())
gen2 := new(genesis.Genesis)

assert.Equal(t, gen1.Params().BlockTimeInSecond, 10)
assert.Equal(t, gen1.Params().BlockIntervalInSecond, 10)

bz, err := json.MarshalIndent(gen1, " ", " ")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion genesis/testnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"genesis_time": "2023-05-08T00:00:00Z",
"params": {
"block_version": 63,
"block_time_in_second": 10,
"block_interval_in_second": 10,
"committee_size": 21,
"block_reward": 1000000000,
"transaction_to_live_interval": 2880,
Expand Down
26 changes: 13 additions & 13 deletions network/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ import (

var _ Network = &MockNetwork{}

type BroadcastData struct {
type PublishData struct {
Data []byte
Target *lp2pcore.PeerID
}

type MockNetwork struct {
*testsuite.TestSuite

BroadcastCh chan BroadcastData
EventCh chan Event
ID peer.ID
OtherNets []*MockNetwork
SendError error
PublishCh chan PublishData
EventCh chan Event
ID peer.ID
OtherNets []*MockNetwork
SendError error
}

func MockingNetwork(ts *testsuite.TestSuite, id peer.ID) *MockNetwork {
return &MockNetwork{
TestSuite: ts,
BroadcastCh: make(chan BroadcastData, 100),
EventCh: make(chan Event, 100),
OtherNets: make([]*MockNetwork, 0),
ID: id,
TestSuite: ts,
PublishCh: make(chan PublishData, 100),
EventCh: make(chan Event, 100),
OtherNets: make([]*MockNetwork, 0),
ID: id,
}
}

Expand Down Expand Up @@ -63,15 +63,15 @@ func (mock *MockNetwork) SendTo(data []byte, pid lp2pcore.PeerID) error {
if mock.SendError != nil {
return mock.SendError
}
mock.BroadcastCh <- BroadcastData{
mock.PublishCh <- PublishData{
Data: data,
Target: &pid,
}
return nil
}

func (mock *MockNetwork) Broadcast(data []byte, _ TopicID) error {
mock.BroadcastCh <- BroadcastData{
mock.PublishCh <- PublishData{
Data: data,
Target: nil, // Send to all
}
Expand Down
1 change: 0 additions & 1 deletion state/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type Facade interface {
LastBlockHash() hash.Hash
LastBlockTime() time.Time
LastCertificate() *certificate.Certificate
BlockTime() time.Duration
UpdateLastCertificate(lastCertificate *certificate.Certificate) error
ProposeBlock(consSigner crypto.Signer, rewardAddr crypto.Address, round int16) (*block.Block, error)
ValidateBlock(block *block.Block) error
Expand Down
16 changes: 8 additions & 8 deletions state/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pactus-project/pactus/types/tx"
"github.com/pactus-project/pactus/types/tx/payload"
"github.com/pactus-project/pactus/types/validator"
"github.com/pactus-project/pactus/util"
"github.com/pactus-project/pactus/util/errors"
"github.com/pactus-project/pactus/util/testsuite"
)
Expand All @@ -39,13 +38,14 @@ type MockState struct {

func MockingState(ts *testsuite.TestSuite) *MockState {
committee, _ := ts.GenerateTestCommittee(21)
genDoc := genesis.TestnetGenesis()
return &MockState{
ts: ts,
TestGenesis: genesis.TestnetGenesis(), // TODO: replace me with the Mainnet genesis
TestGenesis: genDoc,
TestStore: store.MockingStore(ts),
TestPool: txpool.MockingTxPool(),
TestCommittee: committee,
TestParams: param.DefaultParams(),
TestParams: genDoc.Params(),
}
}

Expand Down Expand Up @@ -77,7 +77,11 @@ func (m *MockState) LastBlockHash() hash.Hash {
}

func (m *MockState) LastBlockTime() time.Time {
return util.Now()
if len(m.TestStore.Blocks) > 0 {
return m.TestStore.Blocks[m.TestStore.LastHeight].Header().Time()
}

return m.Genesis().GenesisTime()
}

func (m *MockState) LastCertificate() *certificate.Certificate {
Expand All @@ -87,10 +91,6 @@ func (m *MockState) LastCertificate() *certificate.Certificate {
return m.TestStore.LastCert
}

func (m *MockState) BlockTime() time.Duration {
return time.Second
}

func (m *MockState) UpdateLastCertificate(cert *certificate.Certificate) error {
m.TestStore.LastCert = cert
return nil
Expand Down
15 changes: 4 additions & 11 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,6 @@ func (st *state) LastCertificate() *certificate.Certificate {
return st.lastInfo.Certificate()
}

func (st *state) BlockTime() time.Duration {
st.lk.RLock()
defer st.lk.RUnlock()

return st.params.BlockTime()
}

func (st *state) UpdateLastCertificate(cert *certificate.Certificate) error {
st.lk.Lock()
defer st.lk.Unlock()
Expand Down Expand Up @@ -541,7 +534,7 @@ func (st *state) commitSandbox(sb sandbox.Sandbox, round int16) {
}

func (st *state) validateBlockTime(t time.Time) error {
if t.Second()%st.params.BlockTimeInSecond != 0 {
if t.Second()%st.params.BlockIntervalInSecond != 0 {
return errors.Errorf(errors.ErrInvalidBlock, "block time (%s) is not rounded", t.String())
}
if t.Before(st.lastInfo.BlockTime()) {
Expand All @@ -551,7 +544,7 @@ func (st *state) validateBlockTime(t time.Time) error {
return errors.Errorf(errors.ErrInvalidBlock, "block time (%s) is same as the last block time", t.String())
}
proposeTime := st.proposeNextBlockTime()
threshold := st.params.BlockTime()
threshold := st.params.BlockInterval()
if t.After(proposeTime.Add(threshold)) {
return errors.Errorf(errors.ErrInvalidBlock, "block time (%s) is more than threshold (%s)",
t.String(), proposeTime.String())
Expand All @@ -575,12 +568,12 @@ func (st *state) CommitteePower() int64 {
}

func (st *state) proposeNextBlockTime() time.Time {
timestamp := st.lastInfo.BlockTime().Add(st.params.BlockTime())
timestamp := st.lastInfo.BlockTime().Add(st.params.BlockInterval())

now := util.Now()
if now.After(timestamp.Add(1 * time.Second)) {
st.logger.Debug("it looks the last block had delay", "delay", now.Sub(timestamp))
timestamp = util.RoundNow(st.params.BlockTimeInSecond)
timestamp = util.RoundNow(st.params.BlockIntervalInSecond)
}
return timestamp
}
Expand Down
20 changes: 17 additions & 3 deletions state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ func TestBlockSubsidyTx(t *testing.T) {
assert.Equal(t, trx.Payload().(*payload.TransferPayload).Receiver, rewardAddr)
}

func TestBlockTime(t *testing.T) {
td := setup(t)

t.Run("No blocks: LastBlockTime is the genesis time", func(t *testing.T) {
assert.Equal(t, td.state1.LastBlockTime(), td.state1.Genesis().GenesisTime())
})

t.Run("Commit one block: LastBlockTime is the time of the first block", func(t *testing.T) {
b1, c1 := td.makeBlockAndCertificate(t, 1, td.valSigner1, td.valSigner2, td.valSigner3)
assert.NoError(t, td.state1.CommitBlock(1, b1, c1))

assert.NotEqual(t, td.state1.LastBlockTime(), td.state1.Genesis().GenesisTime())
assert.Equal(t, td.state1.LastBlockTime(), b1.Header().Time())
})
}

func TestCommitBlocks(t *testing.T) {
td := setup(t)

Expand Down Expand Up @@ -482,7 +498,7 @@ func TestSortition(t *testing.T) {
func TestValidateBlockTime(t *testing.T) {
td := setup(t)

fmt.Printf("BlockTimeInSecond: %d\n", td.state1.params.BlockTimeInSecond)
fmt.Printf("BlockTimeInSecond: %d\n", td.state1.params.BlockIntervalInSecond)

// Time not rounded
roundedNow := util.RoundNow(10)
Expand Down Expand Up @@ -668,8 +684,6 @@ func TestLoadStateAfterChangingGenesis(t *testing.T) {
func TestSetBlockTime(t *testing.T) {
td := setup(t)

assert.Equal(t, td.state1.BlockTime(), 10*time.Second)

t.Run("Last block time is a bit far in past", func(t *testing.T) {
td.state1.lastInfo.UpdateBlockTime(util.RoundNow(10).Add(-20 * time.Second))
b, _ := td.state1.ProposeBlock(td.state1.signers[0], td.RandAddress(), 0)
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
BundleFlagCarrierLibP2P = 0x0010
BundleFlagCompressed = 0x0100
BundleFlagBroadcasted = 0x0200
BundleFlagHelloMessage = 0x0400
BundleFlagHandshaking = 0x0400
)

type Bundle struct {
Expand Down
5 changes: 0 additions & 5 deletions sync/bundle/message/blocks_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ import (
"github.com/pactus-project/pactus/util/errors"
)

const (
LatestBlocksResponseCodeOK = 0
LatestBlocksResponseCodeNoMoreBlock = 1
)

type BlocksResponseMessage struct {
ResponseCode ResponseCode `cbor:"1,keyasint"`
SessionID int `cbor:"2,keyasint"`
Expand Down
27 changes: 10 additions & 17 deletions sync/bundle/message/blocks_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,54 +56,47 @@ func TestBlocksResponseMessage(t *testing.T) {
func TestLatestBlocksResponseCode(t *testing.T) {
ts := testsuite.NewTestSuite(t)

t.Run("busy", func(t *testing.T) {
m := NewBlocksResponseMessage(ResponseCodeRejected, ResponseCodeRejected.String(), 1, 0, nil, nil)

assert.NoError(t, m.BasicCheck())
assert.Zero(t, m.From)
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.True(t, m.IsRequestRejected())
assert.Equal(t, m.Reason, ResponseCodeRejected.String())
})

t.Run("rejected", func(t *testing.T) {
m := NewBlocksResponseMessage(ResponseCodeRejected, ResponseCodeRejected.String(), 1, 0, nil, nil)
reason := ts.RandString(16)
m := NewBlocksResponseMessage(ResponseCodeRejected, reason, 1, 0, nil, nil)

assert.NoError(t, m.BasicCheck())
assert.Zero(t, m.From)
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.True(t, m.IsRequestRejected())
assert.Equal(t, m.Reason, ResponseCodeRejected.String())
assert.Equal(t, m.Reason, reason)
})

t.Run("OK - MoreBlocks", func(t *testing.T) {
b1 := ts.GenerateTestBlock(nil)
b2 := ts.GenerateTestBlock(nil)
d1, _ := b1.Bytes()
d2, _ := b2.Bytes()
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, ResponseCodeMoreBlocks.String(), 1, 100, [][]byte{d1, d2}, nil)
reason := ts.RandString(16)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, reason, 1, 100, [][]byte{d1, d2}, nil)

assert.NoError(t, m.BasicCheck())
assert.Equal(t, m.From, uint32(100))
assert.Equal(t, m.To(), uint32(101))
assert.Equal(t, m.Count(), uint32(2))
assert.Zero(t, m.LastCertificateHeight())
assert.False(t, m.IsRequestRejected())
assert.Equal(t, m.Reason, ResponseCodeMoreBlocks.String())
assert.Equal(t, m.Reason, reason)
})

t.Run("OK - Synced", func(t *testing.T) {
cert := ts.GenerateTestCertificate()

m := NewBlocksResponseMessage(ResponseCodeSynced, ResponseCodeSynced.String(), 1, 100, nil, cert)
reason := ts.RandString(16)
m := NewBlocksResponseMessage(ResponseCodeSynced, reason, 1, 100, nil, cert)

assert.NoError(t, m.BasicCheck())
assert.Equal(t, m.From, uint32(100))
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.Equal(t, m.LastCertificateHeight(), uint32(100))
assert.False(t, m.IsRequestRejected())
assert.Equal(t, m.Reason, ResponseCodeSynced.String())
assert.Equal(t, m.Reason, reason)
})
}
Loading
Loading