Skip to content

Commit

Permalink
Fix duplicate keys on intensive map insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Oct 27, 2022
1 parent 16b6419 commit f8849f8
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 35 deletions.
22 changes: 11 additions & 11 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func (m *Map) doStore(key string, valueFn func() interface{}, loadIfExists bool)
for {
store_attempt:
var (
emptykp, emptyvp *unsafe.Pointer
emptyidx int
emptyb *bucketPadded
emptyidx int
)
table := (*mapTable)(atomic.LoadPointer(&m.table))
tableLen := len(table.buckets)
Expand All @@ -261,9 +261,8 @@ func (m *Map) doStore(key string, valueFn func() interface{}, loadIfExists bool)
topHashes := atomic.LoadUint64(&b.topHashMutex)
for i := 0; i < entriesPerMapBucket; i++ {
if b.keys[i] == nil {
if emptykp == nil {
emptykp = &b.keys[i]
emptyvp = &b.values[i]
if emptyb == nil {
emptyb = b
emptyidx = i
}
continue
Expand Down Expand Up @@ -293,13 +292,14 @@ func (m *Map) doStore(key string, valueFn func() interface{}, loadIfExists bool)
}
}
if b.next == nil {
if emptykp != nil {
if emptyb != nil {
// Insertion case. First we update the value, then the key.
// This is important for atomic snapshot states.
atomic.StoreUint64(&b.topHashMutex, storeTopHash(hash, topHashes, emptyidx))
topHashes = atomic.LoadUint64(&emptyb.topHashMutex)
atomic.StoreUint64(&emptyb.topHashMutex, storeTopHash(hash, topHashes, emptyidx))
value := valueFn()
atomic.StorePointer(emptyvp, unsafe.Pointer(&value))
atomic.StorePointer(emptykp, unsafe.Pointer(&key))
atomic.StorePointer(&emptyb.values[emptyidx], unsafe.Pointer(&value))
atomic.StorePointer(&emptyb.keys[emptyidx], unsafe.Pointer(&key))
unlockBucket(&rootb.topHashMutex)
table.addSize(bidx, 1)
return value, false
Expand All @@ -316,7 +316,7 @@ func (m *Map) doStore(key string, valueFn func() interface{}, loadIfExists bool)
newb.keys[0] = unsafe.Pointer(&key)
value := valueFn()
newb.values[0] = unsafe.Pointer(&value)
newb.topHashMutex = storeTopHash(hash, topHashes, emptyidx)
newb.topHashMutex = storeTopHash(hash, newb.topHashMutex, 0)
atomic.StorePointer(&b.next, unsafe.Pointer(newb))
unlockBucket(&rootb.topHashMutex)
table.addSize(bidx, 1)
Expand Down Expand Up @@ -430,7 +430,7 @@ func appendToBucket(hash uint64, keyPtr, valPtr unsafe.Pointer, b *bucketPadded)
newb := new(bucketPadded)
newb.keys[0] = keyPtr
newb.values[0] = valPtr
newb.topHashMutex = storeTopHash(hash, b.topHashMutex, 0)
newb.topHashMutex = storeTopHash(hash, newb.topHashMutex, 0)
b.next = unsafe.Pointer(newb)
return
}
Expand Down
23 changes: 16 additions & 7 deletions map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func parallelSeqResizer(t *testing.T, m *Map, numEntries int, positive bool, cdo
cdone <- true
}

func TestMapParallelResizeGrowOnly(t *testing.T) {
func TestMapParallelResize_GrowOnly(t *testing.T) {
const numEntries = 100_000
m := NewMap()
cdone := make(chan bool)
Expand All @@ -494,22 +494,22 @@ func TestMapParallelResizeGrowOnly(t *testing.T) {

func parallelRandResizer(t *testing.T, m *Map, numIters, numEntries int, cdone chan bool) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < numEntries; i++ {
for i := 0; i < numIters; i++ {
coin := r.Int63n(2)
for j := 0; j < numEntries; j++ {
if coin == 1 {
m.Store(strconv.Itoa(i), i)
m.Store(strconv.Itoa(j), j)
} else {
m.Delete(strconv.Itoa(i))
m.Delete(strconv.Itoa(j))
}
}
}
cdone <- true
}

func TestMapParallelResize(t *testing.T) {
const numIters = 100
const numEntries = 1_000
const numIters = 1000
const numEntries = 2 * EntriesPerMapBucket * MinMapTableLen
m := NewMap()
cdone := make(chan bool)
go parallelRandResizer(t, m, numIters, numEntries, cdone)
Expand All @@ -528,9 +528,18 @@ func TestMapParallelResize(t *testing.T) {
t.Errorf("values do not match for %d: %v", i, v)
}
}
if s := m.Size(); s > numEntries {
s := m.Size()
if s > numEntries {
t.Errorf("unexpected size: %v", s)
}
rs := 0
m.Range(func(key string, value interface{}) bool {
rs++
return true
})
if s != rs {
t.Errorf("size does not match number of entries in Range: %v, %v", s, rs)
}
}

func parallelSeqStorer(t *testing.T, m *Map, storeEach, numIters, numEntries int, cdone chan bool) {
Expand Down
20 changes: 10 additions & 10 deletions mapof.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func (m *MapOf[K, V]) doStore(key K, valueFn func() V, loadIfExists bool) (V, bo
for {
store_attempt:
var (
emptykp, emptyvp *unsafe.Pointer
emptyidx int
emptyb *bucketPadded
emptyidx int
)
table := (*mapTable)(atomic.LoadPointer(&m.table))
tableLen := len(table.buckets)
Expand All @@ -191,9 +191,8 @@ func (m *MapOf[K, V]) doStore(key K, valueFn func() V, loadIfExists bool) (V, bo
topHashes := atomic.LoadUint64(&b.topHashMutex)
for i := 0; i < entriesPerMapBucket; i++ {
if b.keys[i] == nil {
if emptykp == nil {
emptykp = &b.keys[i]
emptyvp = &b.values[i]
if emptyb == nil {
emptyb = b
emptyidx = i
}
continue
Expand Down Expand Up @@ -224,14 +223,15 @@ func (m *MapOf[K, V]) doStore(key K, valueFn func() V, loadIfExists bool) (V, bo
}
}
if b.next == nil {
if emptykp != nil {
if emptyb != nil {
// Insertion case. First we update the value, then the key.
// This is important for atomic snapshot states.
atomic.StoreUint64(&b.topHashMutex, storeTopHash(hash, topHashes, emptyidx))
topHashes = atomic.LoadUint64(&emptyb.topHashMutex)
atomic.StoreUint64(&emptyb.topHashMutex, storeTopHash(hash, topHashes, emptyidx))
value := valueFn()
var wv interface{} = valueFn()
atomic.StorePointer(emptyvp, unsafe.Pointer(&wv))
atomic.StorePointer(emptykp, unsafe.Pointer(&key))
atomic.StorePointer(&emptyb.values[emptyidx], unsafe.Pointer(&wv))
atomic.StorePointer(&emptyb.keys[emptyidx], unsafe.Pointer(&key))
unlockBucket(&rootb.topHashMutex)
table.addSize(bidx, 1)
return value, false
Expand All @@ -249,7 +249,7 @@ func (m *MapOf[K, V]) doStore(key K, valueFn func() V, loadIfExists bool) (V, bo
value := valueFn()
var wv interface{} = value
newb.values[0] = unsafe.Pointer(&wv)
newb.topHashMutex = storeTopHash(hash, topHashes, emptyidx)
newb.topHashMutex = storeTopHash(hash, newb.topHashMutex, 0)
atomic.StorePointer(&b.next, unsafe.Pointer(newb))
unlockBucket(&rootb.topHashMutex)
table.addSize(bidx, 1)
Expand Down
23 changes: 16 additions & 7 deletions mapof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func parallelSeqTypedResizer(t *testing.T, m *MapOf[int, int], numEntries int, p
cdone <- true
}

func TestMapOfParallelResizeGrowOnly(t *testing.T) {
func TestMapOfParallelResize_GrowOnly(t *testing.T) {
const numEntries = 100_000
m := NewIntegerMapOf[int, int]()
cdone := make(chan bool)
Expand All @@ -632,22 +632,22 @@ func TestMapOfParallelResizeGrowOnly(t *testing.T) {

func parallelRandTypedResizer(t *testing.T, m *MapOf[int, int], numIters, numEntries int, cdone chan bool) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < numEntries; i++ {
for i := 0; i < numIters; i++ {
coin := r.Int63n(2)
for j := 0; j < numEntries; j++ {
if coin == 1 {
m.Store(i, i)
m.Store(j, j)
} else {
m.Delete(i)
m.Delete(j)
}
}
}
cdone <- true
}

func TestMapOfParallelResize(t *testing.T) {
const numIters = 100
const numEntries = 1_000
const numIters = 1000
const numEntries = 2 * EntriesPerMapBucket * MinMapTableLen
m := NewIntegerMapOf[int, int]()
cdone := make(chan bool)
go parallelRandTypedResizer(t, m, numIters, numEntries, cdone)
Expand All @@ -666,9 +666,18 @@ func TestMapOfParallelResize(t *testing.T) {
t.Errorf("values do not match for %d: %v", i, v)
}
}
if s := m.Size(); s > numEntries {
s := m.Size()
if s > numEntries {
t.Errorf("unexpected size: %v", s)
}
rs := 0
m.Range(func(key int, value int) bool {
rs++
return true
})
if s != rs {
t.Errorf("size does not match number of entries in Range: %v, %v", s, rs)
}
}

func parallelSeqTypedStorer(t *testing.T, m *MapOf[string, int], storeEach, numIters, numEntries int, cdone chan bool) {
Expand Down

0 comments on commit f8849f8

Please sign in to comment.