Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common/mclock: clean up AfterFunc support #20054

Merged
merged 1 commit into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions common/mclock/mclock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,47 +36,39 @@ func (t AbsTime) Add(d time.Duration) AbsTime {
return t + AbsTime(d)
}

// Clock interface makes it possible to replace the monotonic system clock with
// The Clock interface makes it possible to replace the monotonic system clock with
// a simulated clock.
type Clock interface {
Now() AbsTime
Sleep(time.Duration)
After(time.Duration) <-chan time.Time
AfterFunc(d time.Duration, f func()) Event
AfterFunc(d time.Duration, f func()) Timer
}

// Event represents a cancellable event returned by AfterFunc
type Event interface {
Cancel() bool
// Timer represents a cancellable event returned by AfterFunc
type Timer interface {
Stop() bool
}

// System implements Clock using the system clock.
type System struct{}

// Now implements Clock.
// Now returns the current monotonic time.
func (System) Now() AbsTime {
return AbsTime(monotime.Now())
}

// Sleep implements Clock.
// Sleep blocks for the given duration.
func (System) Sleep(d time.Duration) {
time.Sleep(d)
}

// After implements Clock.
// After returns a channel which receives the current time after d has elapsed.
func (System) After(d time.Duration) <-chan time.Time {
return time.After(d)
}

// AfterFunc implements Clock.
func (System) AfterFunc(d time.Duration, f func()) Event {
return (*SystemEvent)(time.AfterFunc(d, f))
}

// SystemEvent implements Event using time.Timer.
type SystemEvent time.Timer

// Cancel implements Event.
func (e *SystemEvent) Cancel() bool {
return (*time.Timer)(e).Stop()
// AfterFunc runs f on a new goroutine after the duration has elapsed.
func (System) AfterFunc(d time.Duration, f func()) Timer {
return time.AfterFunc(d, f)
}
72 changes: 29 additions & 43 deletions common/mclock/simclock.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,17 @@ import (
// the timeout using a channel or semaphore.
type Simulated struct {
now AbsTime
scheduled []event
scheduled []*simTimer
mu sync.RWMutex
cond *sync.Cond
lastId uint64
}

type event struct {
// simTimer implements Timer on the virtual clock.
type simTimer struct {
do func()
at AbsTime
id uint64
}

// SimulatedEvent implements Event for a virtual clock.
type SimulatedEvent struct {
at AbsTime
id uint64
s *Simulated
}

Expand Down Expand Up @@ -75,13 +70,15 @@ func (s *Simulated) Run(d time.Duration) {
}
}

// ActiveTimers returns the number of timers that haven't fired.
func (s *Simulated) ActiveTimers() int {
s.mu.RLock()
defer s.mu.RUnlock()

return len(s.scheduled)
}

// WaitForTimers waits until the clock has at least n scheduled timers.
func (s *Simulated) WaitForTimers(n int) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -92,20 +89,21 @@ func (s *Simulated) WaitForTimers(n int) {
}
}

// Now implements Clock.
// Now returns the current virtual time.
func (s *Simulated) Now() AbsTime {
s.mu.RLock()
defer s.mu.RUnlock()

return s.now
}

// Sleep implements Clock.
// Sleep blocks until the clock has advanced by d.
func (s *Simulated) Sleep(d time.Duration) {
<-s.After(d)
}

// After implements Clock.
// After returns a channel which receives the current time after the clock
// has advanced by d.
func (s *Simulated) After(d time.Duration) <-chan time.Time {
after := make(chan time.Time, 1)
s.AfterFunc(d, func() {
Expand All @@ -114,8 +112,9 @@ func (s *Simulated) After(d time.Duration) <-chan time.Time {
return after
}

// AfterFunc implements Clock.
func (s *Simulated) AfterFunc(d time.Duration, do func()) Event {
// AfterFunc runs fn after the clock has advanced by d. Unlike with the system
// clock, fn runs on the goroutine that calls Run.
func (s *Simulated) AfterFunc(d time.Duration, fn func()) Timer {
s.mu.Lock()
defer s.mu.Unlock()
s.init()
Expand All @@ -133,44 +132,31 @@ func (s *Simulated) AfterFunc(d time.Duration, do func()) Event {
l = m + 1
}
}
s.scheduled = append(s.scheduled, event{})
ev := &simTimer{do: fn, at: at, s: s}
s.scheduled = append(s.scheduled, nil)
copy(s.scheduled[l+1:], s.scheduled[l:ll])
e := event{do: do, at: at, id: id}
s.scheduled[l] = e
s.scheduled[l] = ev
s.cond.Broadcast()
return &SimulatedEvent{at: at, id: id, s: s}
}

func (s *Simulated) init() {
if s.cond == nil {
s.cond = sync.NewCond(&s.mu)
}
return ev
}

// Cancel implements Event.
func (e *SimulatedEvent) Cancel() bool {
s := e.s
func (ev *simTimer) Stop() bool {
s := ev.s
s.mu.Lock()
defer s.mu.Unlock()

l, h := 0, len(s.scheduled)
ll := h
for l != h {
m := (l + h) / 2
if e.id == s.scheduled[m].id {
l = m
break
}
if (e.at < s.scheduled[m].at) || ((e.at == s.scheduled[m].at) && (e.id < s.scheduled[m].id)) {
h = m
} else {
l = m + 1
for i := 0; i < len(s.scheduled); i++ {
if s.scheduled[i] == ev {
s.scheduled = append(s.scheduled[:i], s.scheduled[i+1:]...)
s.cond.Broadcast()
return true
}
}
if l >= ll || s.scheduled[l].id != e.id {
return false
return false
}

func (s *Simulated) init() {
if s.cond == nil {
s.cond = sync.NewCond(&s.mu)
}
copy(s.scheduled[l:ll-1], s.scheduled[l+1:])
s.scheduled = s.scheduled[:ll-1]
return true
}
115 changes: 115 additions & 0 deletions common/mclock/simclock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package mclock

import (
"testing"
"time"
)

var _ Clock = System{}
var _ Clock = new(Simulated)

func TestSimulatedAfter(t *testing.T) {
const timeout = 30 * time.Minute
const adv = time.Minute

var (
c Simulated
end = c.Now().Add(timeout)
ch = c.After(timeout)
)
for c.Now() < end.Add(-adv) {
c.Run(adv)
select {
case <-ch:
t.Fatal("Timer fired early")
default:
}
}

c.Run(adv)
select {
case stamp := <-ch:
want := time.Time{}.Add(timeout)
if !stamp.Equal(want) {
t.Errorf("Wrong time sent on timer channel: got %v, want %v", stamp, want)
}
default:
t.Fatal("Timer didn't fire")
}
}

func TestSimulatedAfterFunc(t *testing.T) {
var c Simulated

called1 := false
timer1 := c.AfterFunc(100*time.Millisecond, func() { called1 = true })
if c.ActiveTimers() != 1 {
t.Fatalf("%d active timers, want one", c.ActiveTimers())
}
if fired := timer1.Stop(); !fired {
t.Fatal("Stop returned false even though timer didn't fire")
}
if c.ActiveTimers() != 0 {
t.Fatalf("%d active timers, want zero", c.ActiveTimers())
}
if called1 {
t.Fatal("timer 1 called")
}
if fired := timer1.Stop(); fired {
t.Fatal("Stop returned true after timer was already stopped")
}

called2 := false
timer2 := c.AfterFunc(100*time.Millisecond, func() { called2 = true })
c.Run(50 * time.Millisecond)
if called2 {
t.Fatal("timer 2 called")
}
c.Run(51 * time.Millisecond)
if !called2 {
t.Fatal("timer 2 not called")
}
if fired := timer2.Stop(); fired {
t.Fatal("Stop returned true after timer has fired")
}
}

func TestSimulatedSleep(t *testing.T) {
var (
c Simulated
timeout = 1 * time.Hour
done = make(chan AbsTime)
)
go func() {
c.Sleep(timeout)
done <- c.Now()
}()

c.WaitForTimers(1)
c.Run(2 * timeout)
select {
case stamp := <-done:
want := AbsTime(2 * timeout)
if stamp != want {
t.Errorf("Wrong time after sleep: got %v, want %v", stamp, want)
}
case <-time.After(5 * time.Second):
t.Fatal("Sleep didn't return in time")
}
}
6 changes: 3 additions & 3 deletions les/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type balanceTracker struct {
negTimeFactor, negRequestFactor float64
sumReqCost uint64
lastUpdate, nextUpdate, initTime mclock.AbsTime
updateEvent mclock.Event
updateEvent mclock.Timer
// since only a limited and fixed number of callbacks are needed, they are
// stored in a fixed size array ordered by priority threshold.
callbacks [balanceCallbackCount]balanceCallback
Expand Down Expand Up @@ -86,7 +86,7 @@ func (bt *balanceTracker) stop(now mclock.AbsTime) {
bt.timeFactor = 0
bt.requestFactor = 0
if bt.updateEvent != nil {
bt.updateEvent.Cancel()
bt.updateEvent.Stop()
bt.updateEvent = nil
}
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (bt *balanceTracker) checkCallbacks(now mclock.AbsTime) {

// updateAfter schedules a balance update and callback check in the future
func (bt *balanceTracker) updateAfter(dt time.Duration) {
if bt.updateEvent == nil || bt.updateEvent.Cancel() {
if bt.updateEvent == nil || bt.updateEvent.Stop() {
if dt == 0 {
bt.updateEvent = nil
} else {
Expand Down