diff --git a/db_test.go b/db_test.go index f1f124cf882..aa6008ac1f1 100644 --- a/db_test.go +++ b/db_test.go @@ -680,6 +680,7 @@ func TestIterLeak(t *testing.T) { t.Fatal(err) } } else { + defer iter.Close() if err := d.Close(); err == nil { t.Fatalf("expected failure, but found success") } else if !strings.HasPrefix(err.Error(), "leaked iterators:") { @@ -705,7 +706,10 @@ func TestMemTableReservation(t *testing.T) { // Add a block to the cache. Note that the memtable size is larger than the // cache size, so opening the DB should cause this block to be evicted. tmpID := opts.Cache.NewID() - opts.Cache.Set(tmpID, 0, 0, []byte("hello world")) + helloWorld := []byte("hello world") + value := opts.Cache.AllocManual(len(helloWorld)) + copy(value.Buf(), helloWorld) + opts.Cache.Set(tmpID, 0, 0, value).Release() d, err := Open("", opts) if err != nil { diff --git a/internal/cache/alloc.go b/internal/cache/alloc.go index 846a195a3a6..7510e6d7ed8 100644 --- a/internal/cache/alloc.go +++ b/internal/cache/alloc.go @@ -5,9 +5,11 @@ package cache import ( + "runtime" "sync" "time" + "github.com/cockroachdb/pebble/internal/manual" "golang.org/x/exp/rand" ) @@ -73,12 +75,20 @@ func newAllocCache() *allocCache { bufs: make([][]byte, 0, allocCacheCountLimit), } c.rnd.Seed(uint64(time.Now().UnixNano())) + runtime.SetFinalizer(c, freeAllocCache) return c } +func freeAllocCache(obj interface{}) { + c := obj.(*allocCache) + for i := range c.bufs { + manual.Free(c.bufs[i]) + } +} + func (c *allocCache) alloc(n int) []byte { if n < allocCacheMinSize || n >= allocCacheMaxSize { - return make([]byte, n) + return manual.New(n) } class := sizeToClass(n) @@ -92,12 +102,13 @@ func (c *allocCache) alloc(n int) []byte { } } - return make([]byte, n, classToSize(class)) + return manual.New(classToSize(class))[:n] } func (c *allocCache) free(b []byte) { n := cap(b) if n < allocCacheMinSize || n >= allocCacheMaxSize { + manual.Free(b) return } b = b[:n:n] @@ -117,6 +128,7 @@ func (c *allocCache) free(b []byte) { // are biased, but that is fine for the usage here. j := (uint32(len(c.bufs)) * (uint32(c.rnd.Uint64()) & ((1 << 16) - 1))) >> 16 c.size -= cap(c.bufs[j]) + manual.Free(c.bufs[j]) c.bufs[i], c.bufs[j] = nil, c.bufs[i] c.bufs = c.bufs[:i] } diff --git a/internal/cache/alloc_test.go b/internal/cache/alloc_test.go index d5a54051844..a5b83022ed5 100644 --- a/internal/cache/alloc_test.go +++ b/internal/cache/alloc_test.go @@ -7,12 +7,14 @@ package cache import ( "testing" "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" ) func TestAllocCache(t *testing.T) { c := newAllocCache() for i := 0; i < 64; i++ { - c.free(make([]byte, 1025)) + c.free(manual.New(1025)) if c.size == 0 { t.Fatalf("expected cache size to be non-zero") } @@ -34,7 +36,7 @@ func TestAllocCache(t *testing.T) { func TestAllocCacheEvict(t *testing.T) { c := newAllocCache() for i := 0; i < allocCacheCountLimit; i++ { - c.free(make([]byte, 1024)) + c.free(manual.New(1024)) } bufs := make([][]byte, allocCacheCountLimit) @@ -61,7 +63,7 @@ func BenchmarkAllocCache(b *testing.B) { // Populate the cache with buffers if one size class. c := newAllocCache() for i := 0; i < allocCacheCountLimit; i++ { - c.free(make([]byte, 1024)) + c.free(manual.New(1024)) } // Benchmark allocating buffers of a different size class. diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 1e47d22dfbf..7c9ee7fce7d 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -18,10 +18,15 @@ package cache // import "github.com/cockroachdb/pebble/internal/cache" import ( + "fmt" + "os" "runtime" + "strings" "sync" "sync/atomic" "unsafe" + + "github.com/cockroachdb/pebble/internal/invariants" ) type entryType int8 @@ -55,29 +60,75 @@ type key struct { offset uint64 } -type value struct { +// Value holds a reference counted immutable value. +type Value struct { buf []byte // The number of references on the value. When refs drops to 0, the buf // associated with the value may be reused. This is a form of manual memory // management. See Cache.Free. refs int32 + // Is buf automatically managed (i.e. it will be reclaimed by GC) or manually + // managed? + auto bool + + traces []string } -func newValue(b []byte) *value { +func newValue(b []byte) *Value { if b == nil { return nil } - // A value starts with 2 references. One for the cache, and one for the - // handle that will be returned. - return &value{buf: b, refs: 2} + // A value starts with an invalid reference count. When the value is added to + // the cache, the reference count will be set to 2: one reference for the + // cache, and another for the returned Handle. + return &Value{buf: b, refs: -1} +} + +// Buf returns the buffer associated with the value. The contents of the buffer +// should not be changed once the value has been added to the cache. Instead, a +// new Value should be created and added to the cache to replace the existing +// value. +func (v *Value) Buf() []byte { + if v == nil { + return nil + } + return v.buf +} + +// Truncate the buffer to the specified length. The buffer length should not be +// changed once the value has been added to the cache. Instead, a new Value +// should be created and added to the cache to replace the existing value. +func (v *Value) Truncate(n int) { + v.buf = v.buf[:n] } -func (v *value) acquire() { +func (v *Value) makeWeak() { + if !v.auto { + panic("pebble: cannot make auto Value into a weak Value") + } + // Add a reference to the value which will never be cleared. This is + // necessary because WeakHandle.Get() performs an atomic load of the value, + // but we need to ensure that nothing can concurrently be freeing the buffer + // for reuse. Rather than add additional locking to this code path, we add a + // reference here so that the underlying buffer can never be reused. And we + // rely on the Go runtime to eventually GC the value and run the associated + // finalizer. + v.acquire() +} + +func (v *Value) acquire() { atomic.AddInt32(&v.refs, 1) + v.trace("acquire") +} + +func (v *Value) release() bool { + n := atomic.AddInt32(&v.refs, -1) + v.trace("release") + return n == 0 } -func (v *value) release() bool { - return atomic.AddInt32(&v.refs, -1) == 0 +func (v *Value) trace(msg string) { + // v.traces = append(v.traces, fmt.Sprintf("%s: refs=%d\n%s", msg, atomic.LoadInt32(&v.refs), debug.Stack())) } type entry struct { @@ -153,29 +204,20 @@ func (e *entry) unlinkFile() *entry { return next } -func (e *entry) setValue(v *value) { +func (e *entry) setValue(v *Value) { if old := e.getValue(); old != nil { if old.release() { - allocFree(old.buf) + if !old.auto { + allocFree(old.buf) + } + old.buf = nil } } atomic.StorePointer(&e.val, unsafe.Pointer(v)) } -func (e *entry) getValue() *value { - return (*value)(atomic.LoadPointer(&e.val)) -} - -func (e *entry) Get() []byte { - v := e.getValue() - if v == nil { - return nil - } - atomic.StoreInt32(&e.referenced, 1) - // Record a cache hit because the entry is being used as a WeakHandle and - // successfully avoided a more expensive shard.Get() operation. - atomic.AddInt64(&e.shard.hits, 1) - return v.buf +func (e *entry) getValue() *Value { + return (*Value)(atomic.LoadPointer(&e.val)) } // Handle provides a strong reference to an entry in the cache. The reference @@ -183,7 +225,7 @@ func (e *entry) Get() []byte { // slice from being reused. type Handle struct { entry *entry - value *value + value *Value } // Get returns the value stored in handle. @@ -200,7 +242,10 @@ func (h Handle) Get() []byte { func (h Handle) Release() { if h.value != nil { if h.value.release() { - allocFree(h.value.buf) + if !h.value.auto { + allocFree(h.value.buf) + } + h.value.buf = nil } } } @@ -211,26 +256,33 @@ func (h Handle) Release() { // the reference count on the value is incremented which will prevent the // associated buffer from ever being reused until it is GC'd by the Go // runtime. It is not necessary to call Handle.Release() after calling Weak(). -func (h Handle) Weak() WeakHandle { +func (h Handle) Weak() *WeakHandle { if h.entry == nil { return nil // return a nil interface, not (*entry)(nil) } - // Add a reference to the value which will never be cleared. This is - // necessary because WeakHandle.Get() performs an atomic load of the value, - // but we need to ensure that nothing can concurrently be freeing the buffer - // for reuse. Rather than add additional locking to this code path, we add a - // reference here so that the underlying buffer can never be reused. And we - // rely on the Go runtime to eventually GC the buffer. - h.value.acquire() - return h.entry + h.value.makeWeak() + return (*WeakHandle)(h.entry) } // WeakHandle provides a "weak" reference to an entry in the cache. A weak // reference allows the entry to be evicted, but also provides fast access -type WeakHandle interface { - // Get retrieves the value associated with the weak handle, returning nil if - // no value is present. - Get() []byte +type WeakHandle entry + +// Get retrieves the value associated with the weak handle, returning nil if no +// value is present. The calls to Get must be balanced with the calls to +// Release. +func (h *WeakHandle) Get() []byte { + e := (*entry)(h) + v := e.getValue() + if v == nil { + return nil + } + + atomic.StoreInt32(&e.referenced, 1) + // Record a cache hit because the entry is being used as a WeakHandle and + // successfully avoided a more expensive shard.Get() operation. + atomic.AddInt64(&e.shard.hits, 1) + return v.buf } type shard struct { @@ -257,7 +309,7 @@ type shard struct { func (c *shard) Get(id, fileNum, offset uint64) Handle { c.mu.RLock() e := c.blocks[key{fileKey{id, fileNum}, offset}] - var value *value + var value *Value if e != nil { value = e.getValue() if value != nil { @@ -276,54 +328,72 @@ func (c *shard) Get(id, fileNum, offset uint64) Handle { return Handle{value: value} } -func (c *shard) Set(id, fileNum, offset uint64, value []byte) Handle { +func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { + if n := atomic.LoadInt32(&value.refs); n != -1 { + panic(fmt.Sprintf("pebble: Value has already been added to the cache: refs=%d", n)) + } + // Set the reference count to 2: one for the cache, and one for the returned + // Handle. + value.refs = 2 + c.mu.Lock() defer c.mu.Unlock() k := key{fileKey{id, fileNum}, offset} e := c.blocks[k] - v := newValue(value) switch { case e == nil: // no cache entry? add it - e = &entry{ptype: etCold, key: k, size: int64(len(value)), shard: c} + e = &entry{ptype: etCold, key: k, size: int64(len(value.buf)), shard: c} e.init() - e.setValue(v) + e.setValue(value) if c.metaAdd(k, e) { + value.trace("add-cold-1") c.sizeCold += e.size + } else { + value.trace("skip-cold-1") + value.release() } case e.getValue() != nil: // cache entry was a hot or cold page - e.setValue(v) + e.setValue(value) atomic.StoreInt32(&e.referenced, 1) - delta := int64(len(value)) - e.size - e.size = int64(len(value)) + delta := int64(len(value.buf)) - e.size + e.size = int64(len(value.buf)) if e.ptype == etHot { + value.trace("add-hot-1") c.sizeHot += delta } else { + value.trace("add-cold-2") c.sizeCold += delta } c.evict() default: // cache entry was a test page + c.metaDel(e) + c.sizeTest -= e.size + c.coldSize += e.size if c.coldSize > c.targetSize() { c.coldSize = c.targetSize() } + atomic.StoreInt32(&e.referenced, 0) - e.setValue(v) + e.setValue(value) e.ptype = etHot - c.sizeTest -= e.size - c.metaDel(e) if c.metaAdd(k, e) { + value.trace("add-hot-2") c.sizeHot += e.size + } else { + value.trace("skip-hot-1") + value.release() } } - return Handle{entry: e, value: v} + return Handle{entry: e, value: value} } // Delete deletes the cached value for the specified file and offset. @@ -415,6 +485,11 @@ func (c *shard) metaAdd(key key, e *entry) bool { } func (c *shard) metaDel(e *entry) { + if value := e.getValue(); value != nil { + value.trace("metaDel") + } + e.setValue(nil) + delete(c.blocks, e.key) if e == c.handHot { @@ -553,7 +628,20 @@ type Cache struct { // New creates a new cache of the specified size. Memory for the cache is // allocated on demand, not during initialization. func New(size int64) *Cache { - return newShards(size, 2*runtime.NumCPU()) + c := newShards(size, 2*runtime.NumCPU()) + runtime.SetFinalizer(c, clearCache) + return c +} + +func clearCache(obj interface{}) { + c := obj.(*Cache) + for i := range c.shards { + s := &c.shards[i] + s.mu.Lock() + s.maxSize = 0 + s.evict() + s.mu.Unlock() + } } func newShards(size int64, shards int) *Cache { @@ -611,8 +699,8 @@ func (c *Cache) Get(id, fileNum, offset uint64) Handle { // Set sets the cache value for the specified file and offset, overwriting an // existing value if present. A Handle is returned which provides faster // retrieval of the cached value than Get (lock-free and avoidance of the map -// lookup). -func (c *Cache) Set(id, fileNum, offset uint64, value []byte) Handle { +// lookup). The value must have been allocated by Cache.Alloc. +func (c *Cache) Set(id, fileNum, offset uint64, value *Value) Handle { return c.getShard(id, fileNum, offset).Set(id, fileNum, offset, value) } @@ -645,16 +733,49 @@ func (c *Cache) Size() int64 { return size } -// Alloc allocates a byte slice of the specified size, possibly reusing -// previously allocated but unused memory. -func (c *Cache) Alloc(n int) []byte { - return allocNew(n) +func checkValue(obj interface{}) { + v := obj.(*Value) + if v.buf != nil { + fmt.Fprintf(os.Stderr, "%p: was not freed: refs=%d\n%s", + v.buf, atomic.LoadInt32(&v.refs), strings.Join(v.traces, "\n")) + os.Exit(1) + } +} + +// AllocManual allocates a byte slice of the specified size, possibly reusing +// previously allocated but unused memory. The memory backing the value is +// manually managed. The caller MUST either add the value to the cache (via +// Cache.Set), or release the value (via Cache.Free). Failure to do so will +// result in a memory leak. +func (c *Cache) AllocManual(n int) *Value { + v := newValue(allocNew(n)) + if invariants.Enabled { + v.trace("alloc") + runtime.SetFinalizer(v, checkValue) + } + return v } -// Free frees the specified slice of memory. The buffer will possibly be -// reused, making it invalid to use the buffer after calling Free. -func (c *Cache) Free(b []byte) { - allocFree(b) +// AllocAuto allocates an automatically managed value using buf as the internal +// buffer. +func (c *Cache) AllocAuto(buf []byte) *Value { + v := newValue(buf) + v.auto = true + return v +} + +// Free frees the specified value. The buffer associated with the value will +// possibly be reused, making it invalid to use the buffer after calling +// Free. Free is a blunt instrument and will blindly release the buffer. Do not +// call Free on a value that has been added to the cache. +func (c *Cache) Free(v *Value) { + if n := atomic.LoadInt32(&v.refs); n != -1 { + panic(fmt.Sprintf("pebble: Value has been added to the cache: refs=%d", n)) + } + if !v.auto { + allocFree(v.buf) + v.buf = nil + } } // Reserve N bytes in the cache. This effectively shrinks the size of the cache diff --git a/internal/cache/clockpro_test.go b/internal/cache/clockpro_test.go index 5c535092ddf..bf6bc4301c4 100644 --- a/internal/cache/clockpro_test.go +++ b/internal/cache/clockpro_test.go @@ -37,7 +37,9 @@ func TestCache(t *testing.T) { var hit bool h := cache.Get(1, uint64(key), 0) if v := h.Get(); v == nil { - cache.Set(1, uint64(key), 0, append([]byte(nil), fields[0][0])) + value := cache.AllocManual(1) + value.Buf()[0] = fields[0][0] + cache.Set(1, uint64(key), 0, value).Release() } else { hit = true if !bytes.Equal(v, fields[0][:1]) { @@ -52,10 +54,21 @@ func TestCache(t *testing.T) { } } +func testManualValue(cache *Cache, s string, repeat int) *Value { + b := bytes.Repeat([]byte(s), repeat) + v := cache.AllocManual(len(b)) + copy(v.Buf(), b) + return v +} + +func testAutoValue(cache *Cache, s string, repeat int) *Value { + return cache.AllocAuto(bytes.Repeat([]byte(s), repeat)) +} + func TestWeakHandle(t *testing.T) { cache := newShards(5, 1) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - h := cache.Set(1, 0, 0, bytes.Repeat([]byte("b"), 5)) + cache.Set(1, 1, 0, testAutoValue(cache, "a", 5)).Release() + h := cache.Set(1, 0, 0, testAutoValue(cache, "b", 5)) if v := h.Get(); string(v) != "bbbbb" { t.Fatalf("expected bbbbb, but found %v", v) } @@ -64,7 +77,7 @@ func TestWeakHandle(t *testing.T) { if v := w.Get(); string(v) != "bbbbb" { t.Fatalf("expected bbbbb, but found %v", v) } - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() if v := w.Get(); v != nil { t.Fatalf("expected nil, but found %s", v) } @@ -72,9 +85,9 @@ func TestWeakHandle(t *testing.T) { func TestCacheDelete(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() if expected, size := int64(15), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -84,9 +97,13 @@ func TestCacheDelete(t *testing.T) { } if h := cache.Get(1, 0, 0); h.Get() == nil { t.Fatalf("expected to find block 0/0") + } else { + h.Release() } if h := cache.Get(1, 1, 0); h.Get() != nil { t.Fatalf("expected to not find block 1/0") + } else { + h.Release() } // Deleting a non-existing block does nothing. cache.Delete(1, 1, 0) @@ -97,11 +114,11 @@ func TestCacheDelete(t *testing.T) { func TestEvictFile(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 1, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 2, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 1, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 2, testManualValue(cache, "a", 5)).Release() if expected, size := int64(25), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -123,14 +140,14 @@ func TestEvictAll(t *testing.T) { // Verify that it is okay to evict all of the data from a cache. Previously // this would trigger a nil-pointer dereference. cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 101)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 101)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 101)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 101)).Release() } func TestMultipleDBs(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(2, 0, 0, bytes.Repeat([]byte("b"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "b", 5)).Release() if expected, size := int64(10), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -144,31 +161,31 @@ func TestMultipleDBs(t *testing.T) { } h = cache.Get(2, 0, 0) if v := h.Get(); string(v) != "bbbbb" { - t.Fatalf("expected bbbbb, but found %v", v) + t.Fatalf("expected bbbbb, but found %s", v) } } func TestZeroSize(t *testing.T) { cache := newShards(0, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() } func TestReserve(t *testing.T) { cache := newShards(4, 2) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 2, cache.Size()) r := cache.Reserve(1) require.EqualValues(t, 0, cache.Size()) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) - cache.Set(3, 0, 0, []byte("a")) - cache.Set(4, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(3, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(4, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 2, cache.Size()) r() require.EqualValues(t, 2, cache.Size()) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 4, cache.Size()) } diff --git a/internal/manual/manual.go b/internal/manual/manual.go new file mode 100644 index 00000000000..a88d3abc7e7 --- /dev/null +++ b/internal/manual/manual.go @@ -0,0 +1,35 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package manual + +// #include +import "C" +import "unsafe" + +// TODO(peter): Rather than relying an C malloc/free, we could fork the Go +// runtime page allocator and allocate large chunks of memory using mmap or +// similar. + +// New allocates a slice of size n. The returned slice is from manually managed +// memory and MUST be released by calling Free. Failure to do so will result in +// a memory leak. +func New(n int) []byte { + if n == 0 { + return make([]byte, 0) + } + ptr := C.malloc(C.size_t(n)) + // Interpret the C pointer as a pointer to a Go array, then slice. + return (*[maxArrayLen]byte)(unsafe.Pointer(ptr))[:n:n] +} + +// Free frees the specified slice. +func Free(b []byte) { + if cap(b) != 0 { + if len(b) == 0 { + b = b[:cap(b)] + } + C.free(unsafe.Pointer(&b[0])) + } +} diff --git a/internal/manual/manual_32bit.go b/internal/manual/manual_32bit.go new file mode 100644 index 00000000000..3c39ea053b4 --- /dev/null +++ b/internal/manual/manual_32bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build 386 amd64p32 arm armbe mips mipsle mips64p32 mips64p32le ppc sparc + +package manual + +const ( + // maxArrayLen is a safe maximum length for slices on this architecture. + maxArrayLen = 1<<31 - 1 +) diff --git a/internal/manual/manual_64bit.go b/internal/manual/manual_64bit.go new file mode 100644 index 00000000000..23d1a28d736 --- /dev/null +++ b/internal/manual/manual_64bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build amd64 arm64 arm64be ppc64 ppc64le mips64 mips64le s390x sparc64 + +package manual + +const ( + // maxArrayLen is a safe maximum length for slices on this architecture. + maxArrayLen = 1<<50 - 1 +) diff --git a/internal/manual/manual_nocgo.go b/internal/manual/manual_nocgo.go new file mode 100644 index 00000000000..dff844d94a9 --- /dev/null +++ b/internal/manual/manual_nocgo.go @@ -0,0 +1,19 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build !cgo + +package manual + +// Provides versions of manualNew and manualFree when cgo is not available +// (e.g. cross compilation). + +// New allocates a slice of size n. +func New(n int) []byte { + return make([]byte, n) +} + +// Free frees the specified slice. +func Free(b []byte) { +} diff --git a/mem_table.go b/mem_table.go index e92b170981a..2ed569eb13a 100644 --- a/mem_table.go +++ b/mem_table.go @@ -116,6 +116,9 @@ func newMemTable(opts memTableOptions) *memTable { m.releaseMemAccounting = opts.memAccounting(opts.size) } + // TODO(peter): Manually manage the arena memory. There should be no + // difficulty in doing so as we already precisely track when a memtable goes + // "out of scope" for memory accounting. arena := arenaskl.NewArena(uint32(opts.size)) m.skl.Reset(arena, m.cmp) m.rangeDelSkl.Reset(arena, m.cmp) diff --git a/sstable/block.go b/sstable/block.go index fdf034b9069..b18123b7088 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -267,6 +267,12 @@ func (i *blockIter) init(cmp Compare, block block, globalSeqNum uint64) error { return nil } +func (i *blockIter) initHandle(cmp Compare, block cache.Handle, globalSeqNum uint64) error { + i.cacheHandle.Release() + i.cacheHandle = block + return i.init(cmp, block.Get(), globalSeqNum) +} + func (i *blockIter) invalidate() { i.clearCache() i.offset = 0 @@ -284,11 +290,6 @@ func (i *blockIter) resetForReuse() blockIter { } } -func (i *blockIter) setCacheHandle(h cache.Handle) { - i.cacheHandle.Release() - i.cacheHandle = h -} - func (i *blockIter) readEntry() { ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset)) @@ -832,6 +833,7 @@ func (i *blockIter) Error() error { // package. func (i *blockIter) Close() error { i.cacheHandle.Release() + i.cacheHandle = cache.Handle{} i.val = nil return i.err } diff --git a/sstable/data_test.go b/sstable/data_test.go index 777aaf5116b..05b956f9b8a 100644 --- a/sstable/data_test.go +++ b/sstable/data_test.go @@ -177,6 +177,7 @@ func runIterCmd(td *datadriven.TestData, r *Reader) string { if err := iter.Error(); err != nil { return err.Error() } + defer iter.Close() var b bytes.Buffer var prefix []byte diff --git a/sstable/reader.go b/sstable/reader.go index 0bf5745a367..7a0baf2ca4a 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -10,6 +10,8 @@ import ( "errors" "fmt" "io" + "os" + "runtime" "sort" "sync" "unsafe" @@ -17,6 +19,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/crc" + "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/internal/rangedel" "github.com/cockroachdb/pebble/vfs" @@ -80,16 +83,48 @@ var _ base.InternalIterator = (*singleLevelIterator)(nil) var singleLevelIterPool = sync.Pool{ New: func() interface{} { - return &singleLevelIterator{} + i := &singleLevelIterator{} + if invariants.Enabled { + runtime.SetFinalizer(i, checkSingleLevelIterator) + } + return i }, } var twoLevelIterPool = sync.Pool{ New: func() interface{} { - return &twoLevelIterator{} + i := &twoLevelIterator{} + if invariants.Enabled { + runtime.SetFinalizer(i, checkTwoLevelIterator) + } + return i }, } +func checkSingleLevelIterator(obj interface{}) { + i := obj.(*singleLevelIterator) + if p := i.data.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } + if p := i.index.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } +} + +func checkTwoLevelIterator(obj interface{}) { + i := obj.(*twoLevelIterator) + if p := i.data.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } + if p := i.index.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } +} + // Init initializes a singleLevelIterator for reading from the table. It is // synonmous with Reader.NewIter, but allows for reusing of the iterator // between different Readers. @@ -159,13 +194,12 @@ func (i *singleLevelIterator) loadBlock() bool { i.err = errCorruptIndexEntry return false } - block, err := i.reader.readBlock(i.dataBH, nil /* transform */) + block, err := i.reader.readBlock(i.dataBH, nil /* transform */, false /* weak */) if err != nil { i.err = err return false } - i.data.setCacheHandle(block) - i.err = i.data.init(i.cmp, block.Get(), i.reader.Properties.GlobalSeqNum) + i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum) if i.err != nil { return false } @@ -441,18 +475,23 @@ func (i *singleLevelIterator) SetCloseHook(fn func(i Iterator) error) { i.closeHook = fn } +func firstError(err0, err1 error) error { + if err0 != nil { + return err0 + } + return err1 +} + // Close implements internalIterator.Close, as documented in the pebble // package. func (i *singleLevelIterator) Close() error { + var err error if i.closeHook != nil { - if err := i.closeHook(i); err != nil { - return err - } - } - if err := i.data.Close(); err != nil { - return err + err = firstError(err, i.closeHook(i)) } - err := i.err + err = firstError(err, i.data.Close()) + err = firstError(err, i.index.Close()) + err = firstError(err, i.err) *i = i.resetForReuse() singleLevelIterPool.Put(i) return err @@ -559,13 +598,12 @@ func (i *twoLevelIterator) loadIndex() bool { i.err = errors.New("pebble/table: corrupt top level index entry") return false } - indexBlock, err := i.reader.readBlock(h, nil /* transform */) + indexBlock, err := i.reader.readBlock(h, nil /* transform */, false /* weak */) if err != nil { i.err = err return false } - i.index.setCacheHandle(indexBlock) - i.err = i.index.init(i.cmp, indexBlock.Get(), i.reader.Properties.GlobalSeqNum) + i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum) return i.err == nil } @@ -784,15 +822,13 @@ func (i *twoLevelIterator) skipBackward() (*InternalKey, []byte) { // Close implements internalIterator.Close, as documented in the pebble // package. func (i *twoLevelIterator) Close() error { + var err error if i.closeHook != nil { - if err := i.closeHook(i); err != nil { - return err - } - } - if err := i.data.Close(); err != nil { - return err + err = firstError(err, i.closeHook(i)) } - err := i.err + err = firstError(err, i.data.Close()) + err = firstError(err, i.index.Close()) + err = firstError(err, i.err) *i = twoLevelIterator{ singleLevelIterator: i.singleLevelIterator.resetForReuse(), topLevelIndex: i.topLevelIndex.resetForReuse(), @@ -812,6 +848,10 @@ type twoLevelCompactionIterator struct { // twoLevelCompactionIterator implements the base.InternalIterator interface. var _ base.InternalIterator = (*twoLevelCompactionIterator)(nil) +func (i *twoLevelCompactionIterator) Close() error { + return i.twoLevelIterator.Close() +} + func (i *twoLevelCompactionIterator) SeekGE(key []byte) (*InternalKey, []byte) { panic("pebble: SeekGE unimplemented") } @@ -874,7 +914,7 @@ func (i *twoLevelCompactionIterator) skipForward( type weakCachedBlock struct { bh BlockHandle mu sync.RWMutex - handle cache.WeakHandle + handle *cache.WeakHandle } type blockTransform func([]byte) ([]byte, error) @@ -1040,7 +1080,15 @@ func (r *Reader) get(key []byte) (value []byte, err error) { } return nil, err } - return value, i.Close() + + // The value will be "freed" when the iterator is closed, so make a copy + // which will outlast the lifetime of the iterator. + newValue := make([]byte, len(value)) + copy(newValue, value) + if err := i.Close(); err != nil { + return nil, err + } + return newValue, nil } // NewIter returns an iterator for the contents of the table. @@ -1121,7 +1169,7 @@ func (r *Reader) readWeakCachedBlock(w *weakCachedBlock, transform blockTransfor // Slow-path: read the index block from disk. This checks the cache again, // but that is ok because somebody else might have inserted it for us. - h, err := r.readBlock(w.bh, transform) + h, err := r.readBlock(w.bh, transform, true /* weak */) if err != nil { return nil, err } @@ -1135,24 +1183,35 @@ func (r *Reader) readWeakCachedBlock(w *weakCachedBlock, transform blockTransfor } // readBlock reads and decompresses a block from disk into memory. -func (r *Reader) readBlock(bh BlockHandle, transform blockTransform) (cache.Handle, error) { +func (r *Reader) readBlock( + bh BlockHandle, transform blockTransform, weak bool, +) (cache.Handle, error) { if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil { return h, nil } - b := r.opts.Cache.Alloc(int(bh.Length + blockTrailerLen)) + var v *cache.Value + if weak { + v = r.opts.Cache.AllocAuto(make([]byte, int(bh.Length+blockTrailerLen))) + } else { + v = r.opts.Cache.AllocManual(int(bh.Length + blockTrailerLen)) + } + b := v.Buf() if _, err := r.file.ReadAt(b, int64(bh.Offset)); err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } checksum0 := binary.LittleEndian.Uint32(b[bh.Length+1:]) checksum1 := crc.New(b[:bh.Length+1]).Value() if checksum0 != checksum1 { + r.opts.Cache.Free(v) return cache.Handle{}, errors.New("pebble/table: invalid table (checksum mismatch)") } typ := b[bh.Length] b = b[:bh.Length] + v.Truncate(len(b)) switch typ { case noCompressionBlockType: @@ -1160,29 +1219,55 @@ func (r *Reader) readBlock(bh BlockHandle, transform blockTransform) (cache.Hand case snappyCompressionBlockType: decodedLen, err := snappy.DecodedLen(b) if err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } - decoded := r.opts.Cache.Alloc(decodedLen) - decoded, err = snappy.Decode(decoded, b) + var decoded *cache.Value + if weak { + decoded = r.opts.Cache.AllocAuto(make([]byte, decodedLen)) + } else { + decoded = r.opts.Cache.AllocManual(decodedLen) + } + decodedBuf := decoded.Buf() + result, err := snappy.Decode(decodedBuf, b) + r.opts.Cache.Free(v) if err != nil { + r.opts.Cache.Free(decoded) return cache.Handle{}, err } - r.opts.Cache.Free(b) - b = decoded + if len(result) != 0 && + (len(result) != len(decodedBuf) || &result[0] != &decodedBuf[0]) { + r.opts.Cache.Free(decoded) + return cache.Handle{}, fmt.Errorf("pebble/table: snappy decoded into unexpected buffer: %p != %p", + result, decodedBuf) + } + v, b = decoded, decodedBuf default: + r.opts.Cache.Free(v) return cache.Handle{}, fmt.Errorf("pebble/table: unknown block compression: %d", typ) } if transform != nil { - // Transforming blocks is rare, so we don't bother to use cache.Alloc. + // Transforming blocks is rare, so the extra copy of the transformed data + // is not problematic. var err error b, err = transform(b) if err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } + var newV *cache.Value + if weak { + newV = r.opts.Cache.AllocAuto(b) + } else { + newV = r.opts.Cache.AllocManual(len(b)) + copy(newV.Buf(), b) + } + r.opts.Cache.Free(v) + v = newV } - h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, b) + h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, v) return h, nil } @@ -1229,18 +1314,19 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { } func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { - b, err := r.readBlock(metaindexBH, nil /* transform */) + b, err := r.readBlock(metaindexBH, nil /* transform */, false /* weak */) if err != nil { return err } data := b.Get() + defer b.Release() + if uint64(len(data)) != metaindexBH.Length { return fmt.Errorf("pebble/table: unexpected metaindex block size: %d vs %d", len(data), metaindexBH.Length) } i, err := newRawBlockIter(bytes.Compare, data) - b.Release() if err != nil { return err } @@ -1258,13 +1344,12 @@ func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { } if bh, ok := meta[metaPropertiesName]; ok { - b, err = r.readBlock(bh, nil /* transform */) + b, err = r.readBlock(bh, nil /* transform */, false /* weak */) if err != nil { return err } - data := b.Get() r.propertiesBH = bh - err := r.Properties.load(data, bh.Offset) + err := r.Properties.load(b.Get(), bh.Offset) b.Release() if err != nil { return err @@ -1350,7 +1435,7 @@ func (r *Reader) Layout() (*Layout, error) { } l.Index = append(l.Index, indexBH) - subIndex, err := r.readBlock(indexBH, nil /* transform */) + subIndex, err := r.readBlock(indexBH, nil /* transform */, false /* weak */) if err != nil { return nil, err } @@ -1415,7 +1500,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */) + startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, false /* weak */) if err != nil { return 0, err } @@ -1434,7 +1519,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */) + endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, false /* weak */) if err != nil { return 0, err } @@ -1616,7 +1701,7 @@ func (l *Layout) Describe( continue } - h, err := r.readBlock(b.BlockHandle, nil /* transform */) + h, err := r.readBlock(b.BlockHandle, nil /* transform */, false /* weak */) if err != nil { fmt.Fprintf(w, " [err: %s]\n", err) continue diff --git a/sstable/reader_test.go b/sstable/reader_test.go index cc6a8de2c0f..99cfdee284e 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -343,6 +343,7 @@ func TestBytesIteratedCompressed(t *testing.T) { r := buildTestTable(t, numEntries, blockSize, indexBlockSize, SnappyCompression) var bytesIterated, prevIterated uint64 citer := r.NewCompactionIter(&bytesIterated) + for key, _ := citer.First(); key != nil; key, _ = citer.Next() { if bytesIterated < prevIterated { t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) @@ -355,6 +356,13 @@ func TestBytesIteratedCompressed(t *testing.T) { if bytesIterated < expected*99/100 || bytesIterated > expected*101/100 { t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) } + + if err := citer.Close(); err != nil { + t.Fatal(err) + } + if err := r.Close(); err != nil { + t.Fatal(err) + } } } } @@ -368,6 +376,7 @@ func TestBytesIteratedUncompressed(t *testing.T) { r := buildTestTable(t, numEntries, blockSize, indexBlockSize, NoCompression) var bytesIterated, prevIterated uint64 citer := r.NewCompactionIter(&bytesIterated) + for key, _ := citer.First(); key != nil; key, _ = citer.Next() { if bytesIterated < prevIterated { t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) @@ -380,6 +389,13 @@ func TestBytesIteratedUncompressed(t *testing.T) { t.Fatalf("bytesIterated: got %d, want %d (blockSize=%d indexBlockSize=%d numEntries=%d)", bytesIterated, expected, blockSize, indexBlockSize, numEntries) } + + if err := citer.Close(); err != nil { + t.Fatal(err) + } + if err := r.Close(); err != nil { + t.Fatal(err) + } } } } @@ -483,6 +499,8 @@ func BenchmarkTableIterSeekGE(b *testing.B) { for i := 0; i < b.N; i++ { it.SeekGE(keys[rng.Intn(len(keys))]) } + + it.Close() }) } } @@ -501,6 +519,8 @@ func BenchmarkTableIterSeekLT(b *testing.B) { for i := 0; i < b.N; i++ { it.SeekLT(keys[rng.Intn(len(keys))]) } + + it.Close() }) } } @@ -527,6 +547,8 @@ func BenchmarkTableIterNext(b *testing.B) { if testing.Verbose() { fmt.Fprint(ioutil.Discard, sum) } + + it.Close() }) } } @@ -553,6 +575,8 @@ func BenchmarkTableIterPrev(b *testing.B) { if testing.Verbose() { fmt.Fprint(ioutil.Discard, sum) } + + it.Close() }) } } diff --git a/sstable/table_test.go b/sstable/table_test.go index 4404cf52ed5..c0fa57e0c68 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -673,6 +673,9 @@ func TestReaderGlobalSeqNum(t *testing.T) { t.Fatalf("expected %d, but found %d", globalSeqNum, i.Key().SeqNum()) } } + if err := i.Close(); err != nil { + t.Fatal(err) + } } func TestMetaIndexEntriesSorted(t *testing.T) { @@ -687,12 +690,12 @@ func TestMetaIndexEntriesSorted(t *testing.T) { t.Fatal(err) } - b, err := r.readBlock(r.metaIndexBH, nil /* transform */) + b, err := r.readBlock(r.metaIndexBH, nil /* transform */, false /* weak */) if err != nil { t.Fatal(err) } i, err := newRawBlockIter(bytes.Compare, b.Get()) - b.Release() + defer b.Release() if err != nil { t.Fatal(err) } diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 6dbd4413da8..64d1306c54b 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -99,7 +99,12 @@ func TestWriterClearCache(t *testing.T) { opts := ReaderOptions{Cache: cache.New(64 << 20)} writerOpts := WriterOptions{Cache: opts.Cache} cacheOpts := &cacheOpts{cacheID: 1, fileNum: 1} - invalidData := []byte("invalid data") + invalidData := func() *cache.Value { + invalid := []byte("invalid data") + v := opts.Cache.AllocManual(len(invalid)) + copy(v.Buf(), invalid) + return v + } build := func(name string) { f, err := mem.Create(name) @@ -149,7 +154,7 @@ func TestWriterClearCache(t *testing.T) { // Poison the cache for each of the blocks. poison := func(bh BlockHandle) { - opts.Cache.Set(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset, invalidData) + opts.Cache.Set(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset, invalidData()).Release() } foreachBH(layout, poison) @@ -161,7 +166,7 @@ func TestWriterClearCache(t *testing.T) { check := func(bh BlockHandle) { h := opts.Cache.Get(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset) if h.Get() != nil { - t.Fatalf("%d: expected cache to be cleared, but found %q", bh.Offset, invalidData) + t.Fatalf("%d: expected cache to be cleared, but found %q", bh.Offset, h.Get()) } } foreachBH(layout, check) diff --git a/testdata/metrics b/testdata/metrics index 051624895a8..73a5deadd05 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -33,7 +33,7 @@ compact 0 826 B (size == estimated-debt) zmemtbl 1 256 K ztbl 0 0 B bcache 3 732 B 0.0% (score == hit-rate) - tcache 1 688 B 0.0% (score == hit-rate) + tcache 1 664 B 0.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility) @@ -130,7 +130,7 @@ compact 1 0 B (size == estimated-debt) zmemtbl 1 256 K ztbl 1 826 B bcache 4 753 B 27.3% (score == hit-rate) - tcache 1 688 B 60.0% (score == hit-rate) + tcache 1 664 B 60.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility)