forked from cespare/percpu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
percpu.go
113 lines (99 loc) · 3.22 KB
/
percpu.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Package percpu provides best-effort CPU-local sharded values.
package percpu
import (
"golang.org/x/sys/cpu"
"runtime"
"sync/atomic"
_ "unsafe"
)
// Values is a sharded set of values which have an affinity for a particular
// processor. This can be used to avoid cache contention when updating a shared
// value simultaneously from many goroutines.
//
// A zero value of a Values is ready to use.
// Values must not be copied after first use.
type Values[T any] struct {
pad1 cpu.CacheLinePad // prevent false sharing
// shards keeps the per-CPU pointers.
// Grows in case GOMAXPROCS is increased.
// Never shrinks.
shards atomic.Pointer[[]*padded[T]]
pad2 cpu.CacheLinePad // prevent false sharing
}
type padded[T any] struct {
pad1 cpu.CacheLinePad // prevent false sharing
v T
pad2 cpu.CacheLinePad // prevent false sharing
}
// Get returns a pointer to one of the values in v.
//
// The pointer tends to be the one associated with the current processor.
// However, goroutines can migrate at any time, and it may be the case
// that a different goroutine is accessing the same pointer concurrently.
// All access of the returned value must use further synchronization
// mechanisms.
//
// If a value for a given CPU does not exist yet, Values allocates a new zero value.
// The value is guaranteed to be allocated in a memory block
// with sufficient padding to avoid false sharing.
// Standard value alignment guarantees apply.
// This means that the implementation does NOT guarantee that a 64-bit
// integer will be aligned to the 64-bit boundary on 32-bit systems.
// See also Bugs section in the documentation of sync/atomic.
//
// A pointer returned by Get will be observed by Range forever,
// there isn't a way to free any of the values.
func (v *Values[T]) Get() *T {
shardID := getProcID()
shards := v.shards.Load()
for shards == nil || shardID >= len(*shards) {
// GOMAXPROCS has changed or shards was not initialized.
newShardCount := runtime.GOMAXPROCS(0)
if shardID >= newShardCount {
// GOMAXPROCS might be lower than shardID+1 if GOMAXPROCS increased and then decreased.
// Ensure we have enough space.
newShardCount = shardID + 1
}
newShards := make([]*padded[T], newShardCount)
nValid := 0
if shards != nil {
nValid = copy(newShards, *shards)
}
for i := nValid; i < newShardCount; i++ {
newShards[i] = new(padded[T])
}
if v.shards.CompareAndSwap(shards, &newShards) {
shards = &newShards
break
}
// Another goroutine beat us, retry.
shards = v.shards.Load()
}
slot := (*shards)[shardID]
return &slot.v
}
// Range runs fn on all values in v.
//
// fn may be called zero or more times.
// fn might observe a new p before any goroutine calling Get has a chance to initialize it.
//
// The pointers might be concurrently used by other goroutines.
// The user is responsible for synchronizing access to p.
func (v *Values[T]) Range(fn func(p *T)) {
shards := v.shards.Load()
if shards == nil {
return
}
for _, shard := range *shards {
fn(&shard.v)
}
}
//go:linkname runtime_procPin runtime.procPin
func runtime_procPin() int
//go:linkname runtime_procUnpin runtime.procUnpin
func runtime_procUnpin() int
func getProcID() int {
pid := runtime_procPin()
runtime_procUnpin()
return pid
}