Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exp timeout #764

Open
wants to merge 5 commits into
base: dev-upgrade
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions common/countdown/const_duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// A countdown timer that will mostly be used by XDPoS v2 consensus engine
package countdown

import "time"

type ConstTimeoutDuration struct {
duration time.Duration
}

func NewConstTimeoutDuration(duration time.Duration) *ConstTimeoutDuration {
return &ConstTimeoutDuration{
duration: duration,
}
}

func (d *ConstTimeoutDuration) GetTimeoutDuration(inputs ...interface{}) time.Duration {
return d.duration
}

func (d *ConstTimeoutDuration) SetParams(inputs ...interface{}) {
if len(inputs) > 0 {
if duration, ok := inputs[0].(time.Duration); ok {
d.duration = duration
}
}
}
54 changes: 34 additions & 20 deletions common/countdown/countdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,36 @@ import (
"github.com/XinFinOrg/XDPoSChain/log"
)

type TimeoutDurationHelper interface {
GetTimeoutDuration(inputs ...interface{}) time.Duration
SetParams(inputs ...interface{})
}

type CountdownTimer struct {
lock sync.RWMutex // Protects the Initilised field
resetc chan int
quitc chan chan struct{}
initilised bool
timeoutDuration time.Duration
lock sync.RWMutex // Protects the Initilised field
resetc chan int
quitc chan chan struct{}
initilised bool
durationHelper TimeoutDurationHelper
// Triggered when the countdown timer timeout for the `timeoutDuration` period, it will pass current timestamp to the callback function
OnTimeoutFn func(time time.Time, i interface{}) error
OnTimeoutFn func(time time.Time, i ...interface{}) error
}

func NewConstCountDown(duration time.Duration) *CountdownTimer {
return &CountdownTimer{
resetc: make(chan int),
quitc: make(chan chan struct{}),
initilised: false,
durationHelper: NewConstTimeoutDuration(duration),
}
}

func NewCountDown(duration time.Duration) *CountdownTimer {
func NewExpCountDown(duration time.Duration, base float64, max_exponent uint8) *CountdownTimer {
return &CountdownTimer{
resetc: make(chan int),
quitc: make(chan chan struct{}),
initilised: false,
timeoutDuration: duration,
resetc: make(chan int),
quitc: make(chan chan struct{}),
initilised: false,
durationHelper: NewExpTimeoutDuration(duration, base, max_exponent),
}
}

Expand All @@ -34,25 +48,25 @@ func (t *CountdownTimer) StopTimer() {
<-q
}

func (t *CountdownTimer) SetTimeoutDuration(duration time.Duration) {
t.timeoutDuration = duration
func (t *CountdownTimer) SetParams(inputs ...interface{}) {
t.durationHelper.SetParams(inputs...)
}

// Reset will start the countdown timer if it's already stopped, or simply reset the countdown time back to the defual `duration`
func (t *CountdownTimer) Reset(i interface{}) {
func (t *CountdownTimer) Reset(inputs ...interface{}) {
if !t.isInitilised() {
t.setInitilised(true)
go t.startTimer(i)
go t.startTimer(inputs...)
} else {
t.resetc <- 0
}
}

// A long running process that
func (t *CountdownTimer) startTimer(i interface{}) {
func (t *CountdownTimer) startTimer(inputs ...interface{}) {
// Make sure we mark Initilised to false when we quit the countdown
defer t.setInitilised(false)
timer := time.NewTimer(t.timeoutDuration)
timer := time.NewTimer(t.durationHelper.GetTimeoutDuration(inputs...))
// We start with a inf loop
for {
select {
Expand All @@ -63,16 +77,16 @@ func (t *CountdownTimer) startTimer(i interface{}) {
case <-timer.C:
log.Debug("Countdown time reached!")
go func() {
err := t.OnTimeoutFn(time.Now(), i)
err := t.OnTimeoutFn(time.Now(), inputs...)
if err != nil {
log.Error("OnTimeoutFn error", "error", err)
}
log.Debug("OnTimeoutFn processed")
}()
timer.Reset(t.timeoutDuration)
timer.Reset(t.durationHelper.GetTimeoutDuration(inputs...))
case <-t.resetc:
log.Debug("Reset countdown timer")
timer.Reset(t.timeoutDuration)
timer.Reset(t.durationHelper.GetTimeoutDuration(inputs...))
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions common/countdown/countdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
func TestCountdownWillCallback(t *testing.T) {
var fakeI interface{}
called := make(chan int)
OnTimeoutFn := func(time.Time, interface{}) error {
OnTimeoutFn := func(time.Time, ...interface{}) error {
called <- 1
return nil
}

countdown := NewCountDown(1000 * time.Millisecond)
countdown := NewConstCountDown(1000 * time.Millisecond)
countdown.OnTimeoutFn = OnTimeoutFn
countdown.Reset(fakeI)
<-called
Expand All @@ -26,12 +26,12 @@ func TestCountdownWillCallback(t *testing.T) {
func TestCountdownShouldReset(t *testing.T) {
var fakeI interface{}
called := make(chan int)
OnTimeoutFn := func(time.Time, interface{}) error {
OnTimeoutFn := func(time.Time, ...interface{}) error {
called <- 1
return nil
}

countdown := NewCountDown(5000 * time.Millisecond)
countdown := NewConstCountDown(5000 * time.Millisecond)
countdown.OnTimeoutFn = OnTimeoutFn
// Check countdown did not start
assert.False(t, countdown.isInitilised())
Expand Down Expand Up @@ -74,12 +74,12 @@ firstReset:
func TestCountdownShouldResetEvenIfErrored(t *testing.T) {
var fakeI interface{}
called := make(chan int)
OnTimeoutFn := func(time.Time, interface{}) error {
OnTimeoutFn := func(time.Time, ...interface{}) error {
called <- 1
return errors.New("ERROR!")
}

countdown := NewCountDown(5000 * time.Millisecond)
countdown := NewConstCountDown(5000 * time.Millisecond)
countdown.OnTimeoutFn = OnTimeoutFn
// Check countdown did not start
assert.False(t, countdown.isInitilised())
Expand Down Expand Up @@ -122,12 +122,12 @@ firstReset:
func TestCountdownShouldBeAbleToStop(t *testing.T) {
var fakeI interface{}
called := make(chan int)
OnTimeoutFn := func(time.Time, interface{}) error {
OnTimeoutFn := func(time.Time, ...interface{}) error {
called <- 1
return nil
}

countdown := NewCountDown(5000 * time.Millisecond)
countdown := NewConstCountDown(5000 * time.Millisecond)
countdown.OnTimeoutFn = OnTimeoutFn
// Check countdown did not start
assert.False(t, countdown.isInitilised())
Expand All @@ -144,8 +144,8 @@ func TestCountdownShouldBeAbleToStop(t *testing.T) {
func TestCountdownShouldAvoidDeadlock(t *testing.T) {
var fakeI interface{}
called := make(chan int)
countdown := NewCountDown(5000 * time.Millisecond)
OnTimeoutFn := func(time.Time, interface{}) error {
countdown := NewConstCountDown(5000 * time.Millisecond)
OnTimeoutFn := func(time.Time, ...interface{}) error {
countdown.Reset(fakeI)
called <- 1
return nil
Expand Down
78 changes: 78 additions & 0 deletions common/countdown/exp_duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// A countdown timer that will mostly be used by XDPoS v2 consensus engine
package countdown

import (
"math"
"time"

"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
)

const max_exponent_upperbound uint8 = 32

type ExpTimeoutDuration struct {
duration time.Duration
base float64
max_exponent uint8
}

func NewExpTimeoutDuration(duration time.Duration, base float64, max_exponent uint8) *ExpTimeoutDuration {
d := &ExpTimeoutDuration{
duration: duration,
base: base,
max_exponent: max_exponent,
}
d.sanityCheck()
return d
}

func (d *ExpTimeoutDuration) sanityCheck() {
if d.max_exponent >= max_exponent_upperbound {
log.Error("max_exponent (e)= >= max_exponent_upperbound (e_upper)", "e", d.max_exponent, "e_upper", max_exponent_upperbound)
panic("max_exponent (e)= >= max_exponent_upperbound (e_upper)")
}
if math.Pow(d.base, float64(d.max_exponent)) >= float64(math.MaxUint32) {
log.Error("base^max_exponent (b^e) should be less than 2^32", "b", d.base, "e", d.max_exponent)
panic("base^max_exponent (b^e) should be less than 2^32")
}
}

// The inputs should be: [blockchain, currentRound, highestQuorumCert's round]
func (d *ExpTimeoutDuration) GetTimeoutDuration(inputs ...interface{}) time.Duration {
power := float64(1)
if len(inputs) >= 3 {
if currentRound, ok := inputs[1].(types.Round); ok {
if highestRound, ok := inputs[2].(types.Round); ok {
// below statement must be true, just to prevent negative result
if highestRound < currentRound {
exp := uint8(currentRound-highestRound) - 1
if exp > d.max_exponent {
exp = d.max_exponent
}
power = math.Pow(d.base, float64(exp))
}
}
}
}
return d.duration * time.Duration(power)
}

func (d *ExpTimeoutDuration) SetParams(inputs ...interface{}) {
if len(inputs) >= 1 {
if duration, ok := inputs[0].(time.Duration); ok {
d.duration = duration
}
}
if len(inputs) >= 2 {
if base, ok := inputs[1].(float64); ok {
d.base = base
}
}
if len(inputs) >= 3 {
if exponent, ok := inputs[2].(uint8); ok {
d.max_exponent = exponent
}
}
d.sanityCheck()
}
74 changes: 74 additions & 0 deletions common/countdown/exp_duration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package countdown

import (
"testing"
"time"

"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/stretchr/testify/assert"
)

func TestExpDuration(t *testing.T) {
base := float64(2.0)
max_exponent := uint8(2)
duration := time.Second * 59
helper := NewExpTimeoutDuration(duration, base, max_exponent)
// round 10 = 9+1, normal case, should be base
currentRound := types.Round(10)
highestQCRound := types.Round(9)
result := helper.GetTimeoutDuration(nil, currentRound, highestQCRound)
assert.Equal(t, duration, result)

// round 11 = 9+2, already 1 round timeout, should be base*exponent
currentRound++
result = helper.GetTimeoutDuration(nil, currentRound, highestQCRound)
assert.Equal(t, duration*time.Duration(base), result)

// round 12 = 9+3, already 2 rounds timeout, should be base*exponent^2
currentRound++
result = helper.GetTimeoutDuration(nil, currentRound, highestQCRound)
assert.Equal(t, duration*time.Duration(base)*time.Duration(base), result)

// test compatible with const timeout user calling it
result = helper.GetTimeoutDuration(nil)
assert.Equal(t, duration, result)

// test SetParams
duration++
max_exponent++
base++
helper.SetParams(duration, base, max_exponent)
result = helper.GetTimeoutDuration(nil, currentRound, highestQCRound)
assert.Equal(t, duration*time.Duration(base)*time.Duration(base), result)
duration++
helper.SetParams(duration)
result = helper.GetTimeoutDuration(nil, currentRound, highestQCRound)
assert.Equal(t, duration*time.Duration(base)*time.Duration(base), result)

// round 14 = 9+5, already 4 rounds timeout, but max_exponent=3, should be base*exponent^3
currentRound++
currentRound++
result = helper.GetTimeoutDuration(nil, currentRound, highestQCRound)
assert.Equal(t, duration*time.Duration(base)*time.Duration(base)*time.Duration(base), result)

// extreme case
helper.SetParams(duration, float64(0), uint8(0))
result = helper.GetTimeoutDuration(nil, currentRound, highestQCRound)
assert.Equal(t, duration, result)
}

func TestInvalidParameter(t *testing.T) {
assert.Panics(t, func() {
base := float64(2.0)
max_exponent := uint8(32)
duration := time.Second * 59
_ = NewExpTimeoutDuration(duration, base, max_exponent)
})

assert.Panics(t, func() {
base := float64(3.0)
max_exponent := uint8(21)
duration := time.Second * 59
_ = NewExpTimeoutDuration(duration, base, max_exponent)
})
}
4 changes: 2 additions & 2 deletions consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i
config := chainConfig.XDPoS
// Setup timeoutTimer
duration := time.Duration(config.V2.CurrentConfig.TimeoutPeriod) * time.Second
timeoutTimer := countdown.NewCountDown(duration)
timeoutTimer := countdown.NewExpCountDown(duration, config.V2.CurrentConfig.ExpTimeoutConfig.Base, config.V2.CurrentConfig.ExpTimeoutConfig.MaxExponent)

snapshots, _ := lru.NewARC(utils.InmemorySnapshots)
signatures, _ := lru.NewARC(utils.InmemorySnapshots)
Expand Down Expand Up @@ -143,7 +143,7 @@ func (x *XDPoS_v2) UpdateParams(header *types.Header) {

// Setup timeoutTimer
duration := time.Duration(x.config.V2.CurrentConfig.TimeoutPeriod) * time.Second
x.timeoutWorker.SetTimeoutDuration(duration)
x.timeoutWorker.SetParams(duration, x.config.V2.CurrentConfig.ExpTimeoutConfig.Base, x.config.V2.CurrentConfig.ExpTimeoutConfig.MaxExponent)

// avoid deadlock
go func() {
Expand Down
6 changes: 3 additions & 3 deletions consensus/XDPoS/engines/engine_v2/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,17 @@ func (x *XDPoS_v2) sendTimeout(chain consensus.ChainReader) error {
Function that will be called by timer when countdown reaches its threshold.
In the engine v2, we would need to broadcast timeout messages to other peers
*/
func (x *XDPoS_v2) OnCountdownTimeout(time time.Time, chain interface{}) error {
func (x *XDPoS_v2) OnCountdownTimeout(time time.Time, chain ...interface{}) error {
x.lock.Lock()
defer x.lock.Unlock()

// Check if we are within the master node list
allow := x.allowedToSend(chain.(consensus.ChainReader), chain.(consensus.ChainReader).CurrentHeader(), "timeout")
allow := x.allowedToSend(chain[0].(consensus.ChainReader), chain[0].(consensus.ChainReader).CurrentHeader(), "timeout")
if !allow {
return nil
}

err := x.sendTimeout(chain.(consensus.ChainReader))
err := x.sendTimeout(chain[0].(consensus.ChainReader))
if err != nil {
log.Error("Error while sending out timeout message at time: ", "time", time, "err", err)
return err
Expand Down
Loading
Loading