-
Notifications
You must be signed in to change notification settings - Fork 0
/
CLL.jl
146 lines (123 loc) · 4.72 KB
/
CLL.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# This file is a part of Julia. License is MIT: https://julialang.org/license
# Also see `work-stealing-queue.h` this is a pure Julia re-implementation
# =======
# Chase and Lev's work-stealing queue, optimized for
# weak memory models by Le et al.
#
# * Chase D., Lev Y. Dynamic Circular Work-Stealing queue
# * Le N. M. et al. Correct and Efficient Work-Stealing for
# Weak Memory Models
# =======
module CLL
import ..WorkstealingQueues: push!, pushfirst!, pushlast!, pop!, steal!
# mutable so that we don't get a mutex in WSQueue
mutable struct WSBuffer{T}
const buffer::AtomicMemory{T}
const capacity::Int64
const mask::Int64
@noinline function WSBuffer{T}(capacity::Int64) where T
if __unlikely(capacity == 0)
throw(ArgumentError("Capacity can't be zero"))
end
if __unlikely(count_ones(capacity) != 1)
throw(ArgumentError("Capacity must be a power of two"))
end
buffer = AtomicMemory{T}(undef, capacity)
mask = capacity - 1
return new(buffer, capacity, mask)
end
end
function Base.getindex_atomic(buf::WSBuffer{T}, order::Symbol, idx::Int64) where T
@inbounds Base.getindex_atomic(buf.buffer, order, ((idx - 1) & buf.mask) + 1)
end
function Base.setindex_atomic!(buf::WSBuffer{T}, order::Symbol, val::T, idx::Int64) where T
@inbounds Base.setindex_atomic!(buf.buffer, order, val,((idx - 1) & buf.mask) + 1)
end
function Base.modifyindex_atomic!(buf::WSBuffer{T}, order::Symbol, op, val::T, idx::Int64) where T
@inbounds Base.modifyindex_atomic!(buf.buffer, order, op, val, ((idx - 1) & buf.mask) + 1)
end
function Base.swapindex_atomic!(buf::WSBuffer{T}, order::Symbol, val::T, idx::Int64) where T
@inbounds Base.swapindex_atomic!(buf.buffer, order, val, ((idx - 1) & buf.mask) + 1)
end
function Base.replaceindex_atomic!(buf::WSBuffer{T}, success_order::Symbol, fail_order::Symbol, expected::T, desired::T, idx::Int64) where T
@inbounds Base.replaceindex_atomic!(buf.buffer, success_order, fail_order, expected, desired, ((idx - 1) & buf.mask) + 1)
end
function Base.copyto!(dst::WSBuffer{T}, src::WSBuffer{T}) where T
@assert dst.capacity >= src.capacity
for i in eachindex(src.buffer)
@inbounds @atomic :monotonic dst.buffer[i] = src.buffer[i]
end
end
"""
WSQueue{T}
Work-stealing queue after Chase & Le.
!!! note
popfirst! and push! are only allowed to be called from owner.
"""
mutable struct WSQueue{T}
@atomic top::Int64
@atomic bottom::Int64
@atomic buffer::WSBuffer{T}
function WSQueue{T}(capacity = 64) where T
new(1, 1, WSBuffer{T}(capacity))
end
end
function Base.push!(q::WSQueue{T}, v::T) where T
bottom = @atomic :monotonic q.bottom
top = @atomic :acquire q.top
buffer = @atomic :monotonic q.buffer
# add unlikely
if __unlikely(bottom - top > (buffer.capacity - 1))
# @debug "Growing WS buffer" bottom top capacity = buffer.capacity
new_buffer = WSBuffer{T}(2*buffer.capacity)
copyto!(new_buffer, buffer) # TODO only copy active range?
@atomic :release q.buffer = new_buffer
buffer = new_buffer
end
# @show bottom
@atomic :monotonic buffer[bottom] = v
Core.Intrinsics.atomic_fence(:release)
@atomic :monotonic q.bottom = bottom + 1
return nothing
end
function Base.popfirst!(q::WSQueue{T}) where T
bottom = (@atomic :monotonic q.bottom) - 1
buffer = @atomic :monotonic q.buffer
@atomic :monotonic q.bottom = bottom
Core.Intrinsics.atomic_fence(:sequentially_consistent) # TODO slow on AMD
top = @atomic :monotonic q.top
if __likely(top <= bottom)
v = @atomic :monotonic buffer[bottom]
if top == bottom
_, success = @atomicreplace q.top top => top+1
@atomic :monotonic q.bottom = bottom + 1
if !success
return nothing # failed
end
end
return v
else
@atomic :monotonic q.bottom = bottom + 1
return nothing # failed
end
end
function steal!(q::WSQueue{T}) where T
top = @atomic :acquire q.top
Core.Intrinsics.atomic_fence(:sequentially_consistent)
bottom = @atomic :acquire q.bottom
if top < bottom
buffer = @atomic :monotonic q.buffer
v = @atomic :monotonic buffer[top]
_, success = @atomicreplace q.top top => top+1
if !success
return nothing # failed
end
return v
end
return nothing # failed
end
Base.pop!(q::WSQueue{T}) where T = popfirst!(q)
@inline __likely(cond::Bool) = ccall("llvm.expect", llvmcall, Bool, (Bool, Bool), cond, true)
@inline __unlikely(cond::Bool) = ccall("llvm.expect", llvmcall, Bool, (Bool, Bool), cond, false)
Base.isempty(q::WSQueue) = q.top == q.bottom
end #module