Skip to content

Commit

Permalink
runtime: reduce timer latency
Browse files Browse the repository at this point in the history
Change the scheduler to consider P's not marked for preemption as
potential targets for timer stealing by spinning P's.

Ignoring timers on P's not marked for preemption, as the scheduler
did previously, has the downside that timers on that P must wait for
its current G to complete or get preempted. But that can take as long
as 10ms.

In addition, this choice is only made when a spinning P is available
and in timer-bound applications it may result in the spinning P
stopping instead of performing available work, reducing parallelism.

In CL 214185 we avoided taking the timer lock of a P with no ready
timers, which reduces the chances of timer lock contention.

Fixes golang#38860

Change-Id: If52680509b0f3b66dbd1d0c13fa574bd2d0bbd57
  • Loading branch information
ChrisHines committed May 7, 2020
1 parent b5f7ff4 commit 0ec80b8
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 29 deletions.
46 changes: 17 additions & 29 deletions src/runtime/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2213,14 +2213,18 @@ top:
return gp, false
}

// Consider stealing timers from p2.
// This call to checkTimers is the only place where
// we hold a lock on a different P's timers.
// Lock contention can be a problem here, so avoid
// grabbing the lock if p2 is running and not marked
// for preemption. If p2 is running and not being
// preempted we assume it will handle its own timers.
if i > 2 && shouldStealTimers(p2) {
// Steal timers from p2. This call to checkTimers is the only place
// where we might hold a lock on a different P's timers. We do this
// when i == 2 because that is the pass before runqsteal considers
// G's in the runnext slot of p2. Stealing from the other P's
// runnext should be the last resort, so if there are timers to
// steal, do that first.
//
// We only check timers on one of the stealing iterations because
// the time stored in now doesn't change in this loop and checking
// the timers for each P more than once with the same value of now
// is probably a waste of time.
if i == 2 {
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
Expand Down Expand Up @@ -2447,6 +2451,10 @@ func wakeNetPoller(when int64) {
if pollerPollUntil == 0 || pollerPollUntil > when {
netpollBreak()
}
} else {
// There are no threads in the network poller, try to get
// one there so it can handle new timers.
wakep()
}
}

Expand Down Expand Up @@ -2728,25 +2736,6 @@ func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
return rnow, pollUntil, ran
}

// shouldStealTimers reports whether we should try stealing the timers from p2.
// We don't steal timers from a running P that is not marked for preemption,
// on the assumption that it will run its own timers. This reduces
// contention on the timers lock.
func shouldStealTimers(p2 *p) bool {
if p2.status != _Prunning {
return true
}
mp := p2.m.ptr()
if mp == nil || mp.locks > 0 {
return false
}
gp := mp.curg
if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt {
return false
}
return true
}

func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
unlock((*mutex)(lock))
return true
Expand Down Expand Up @@ -4628,8 +4617,7 @@ func sysmon() {
}
}
if next < now {
// There are timers that should have already run,
// perhaps because there is an unpreemptible P.
// There are timers that should have already run.
// Try to start an M to run them.
startm(nil, false)
}
Expand Down
70 changes: 70 additions & 0 deletions src/time/sleep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package time_test

import (
"context"
"errors"
"fmt"
"runtime"
"runtime/trace"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -501,3 +503,71 @@ func TestZeroTimerStopPanics(t *testing.T) {
var tr Timer
tr.Stop()
}

func TestTimerParallelism(t *testing.T) {
gmp := runtime.GOMAXPROCS(0)
if gmp < 2 || runtime.NumCPU() < gmp {
t.Skip("skipping with GOMAXPROCS < 2 or NumCPU < GOMAXPROCS")
}

doWork := func(dur Duration) {
start := Now()
for Since(start) < dur {
}
}

var wg sync.WaitGroup

// Warm up the scheduler's thread pool. Without this step the time to start
// new threads as the parallelism increases on high CPU count machines can
// cause false test failures.
var count int32
wg.Add(gmp)
for i := 0; i < gmp; i++ {
go func() {
defer wg.Done()
atomic.AddInt32(&count, 1)
for atomic.LoadInt32(&count) < int32(gmp) {
// spin until all threads started
}
// spin a bit more to ensure they are all running on separate CPUs.
trace.Log(context.Background(), "event", "spinning hot")
doWork(10 * Millisecond)
}()
}
wg.Wait()

// Let most of the threads stop. Sleeping for a bit also puts some space
// between the warm up and main test when looking at execution trace data
// for this test.
Sleep(5 * Millisecond)

const (
delay = Millisecond
grace = 4 * Millisecond
)

for k := 0; k < 10; k++ {
k := k

wg.Add(gmp - 1)
for i := 0; i < gmp-1; i++ {
name := fmt.Sprintf("timerAfter-%d-%d", k, i)

trace.Log(context.Background(), "event", "AfterFunc")
expectedWakeup := Now().Add(delay)
AfterFunc(delay, func() {
if late := Since(expectedWakeup); late > grace {
trace.Log(context.Background(), "event", "late wakeup")
t.Errorf("%s: wakeup late by %v", name, late)
}
doWork(10 * Millisecond)
wg.Done()
})
}

doWork(50 * Millisecond)
}

wg.Wait()
}

0 comments on commit 0ec80b8

Please sign in to comment.