Skip to content

Commit

Permalink
internal/batchskl: return error on index overflow
Browse files Browse the repository at this point in the history
Currently, the raw data for a skiplist node is stored in a byte slice,
which is indexed into with a `uint32`. In situations where a large
number of nodes are added to the skiplist (on the order of 100 M), the
addition of a node can result in integer overflow which results in a
runtime panic.

Rather than panic, check that the addition of a new node would not
result in an overflow. In the situation where an overflow would occur,
return an error to the caller.

Propagate potential errors for the skip list to external callers. Prior
to this, it was assumed that the addition of a node would never return
an error and the top level methods would panic. Exposing the error
allows external callers to make a determination at runtime as to how the
overflow should be handled.

The altered implementation is slightly faster:

```
name                   old time/op  new time/op  delta
ReadWrite/frac_0-16     724ns ±17%   690ns ± 3%  -4.65%  (p=0.016 n=20+19)
ReadWrite/frac_10-16    694ns ± 3%   687ns ± 2%  -1.03%  (p=0.010 n=17+16)
ReadWrite/frac_20-16    682ns ± 7%   694ns ±15%    ~     (p=0.641 n=17+20)
ReadWrite/frac_30-16    655ns ± 6%   658ns ± 5%    ~     (p=0.443 n=17+18)
ReadWrite/frac_40-16    636ns ±14%   655ns ±10%    ~     (p=0.271 n=18+19)
ReadWrite/frac_50-16    611ns ±11%   594ns ± 4%  -2.87%  (p=0.013 n=18+20)
ReadWrite/frac_60-16    595ns ±15%   559ns ± 3%  -6.03%  (p=0.004 n=20+20)
ReadWrite/frac_70-16    546ns ±18%   497ns ± 2%  -8.93%  (p=0.000 n=19+20)
ReadWrite/frac_80-16    419ns ± 5%   428ns ± 3%  +2.19%  (p=0.000 n=18+20)
ReadWrite/frac_90-16    327ns ± 7%   333ns ± 3%    ~     (p=0.055 n=20+20)
ReadWrite/frac_100-16  21.9ns ± 5%  21.9ns ± 5%    ~     (p=0.569 n=20+20)
```

Closes #1258.
  • Loading branch information
nicktrav committed Feb 1, 2022
1 parent 8440f29 commit 38b68e1
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 39 deletions.
33 changes: 12 additions & 21 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ type DeferredBatchOp struct {
// have been filled into Key and Value. Not calling Finish or not
// copying/encoding keys will result in an incomplete index, and calling Finish
// twice may result in a panic.
func (d DeferredBatchOp) Finish() {
func (d DeferredBatchOp) Finish() error {
if d.index != nil {
if err := d.index.Add(d.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
}

// A Batch is a sequence of Sets, Merges, Deletes, DeleteRanges, RangeKeySets,
Expand Down Expand Up @@ -394,8 +394,7 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
err = b.index.Add(uint32(offset))
}
if err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
b.memTableSize += memTableEntrySize(len(key), len(value))
Expand Down Expand Up @@ -512,8 +511,7 @@ func (b *Batch) Set(key, value []byte, _ *WriteOptions) error {
// in go1.13 will remove the need for this.
if b.index != nil {
if err := b.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
Expand Down Expand Up @@ -542,8 +540,7 @@ func (b *Batch) Merge(key, value []byte, _ *WriteOptions) error {
// in go1.13 will remove the need for this.
if b.index != nil {
if err := b.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
Expand All @@ -569,8 +566,7 @@ func (b *Batch) Delete(key []byte, _ *WriteOptions) error {
// in go1.13 will remove the need for this.
if b.index != nil {
if err := b.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
Expand All @@ -597,8 +593,7 @@ func (b *Batch) SingleDelete(key []byte, _ *WriteOptions) error {
// in go1.13 will remove the need for this.
if b.index != nil {
if err := b.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
Expand Down Expand Up @@ -628,8 +623,7 @@ func (b *Batch) DeleteRange(start, end []byte, _ *WriteOptions) error {
// in go1.13 will remove the need for this.
if deferredOp.index != nil {
if err := deferredOp.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
Expand Down Expand Up @@ -679,8 +673,7 @@ func (b experimentalBatch) RangeKeySet(start, end, suffix, value []byte, _ *Writ
// Manually inline DeferredBatchOp.Finish().
if deferredOp.index != nil {
if err := deferredOp.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
Expand Down Expand Up @@ -719,8 +712,7 @@ func (b experimentalBatch) RangeKeyUnset(start, end, suffix []byte, _ *WriteOpti
// Manually inline DeferredBatchOp.Finish()
if deferredOp.index != nil {
if err := deferredOp.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
Expand All @@ -740,8 +732,7 @@ func (b experimentalBatch) RangeKeyDelete(start, end []byte, _ *WriteOptions) er
// Manually inline DeferredBatchOp.Finish().
if deferredOp.index != nil {
if err := deferredOp.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
return err
}
}
return nil
Expand Down
67 changes: 51 additions & 16 deletions internal/batchskl/skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,23 @@ import (
)

const (
maxHeight = 20
maxNodeSize = int(unsafe.Sizeof(node{}))
linksSize = int(unsafe.Sizeof(links{}))
maxHeight = 20
maxNodeSize = int(unsafe.Sizeof(node{}))
linksSize = int(unsafe.Sizeof(links{}))
maxNodesSize = math.MaxUint32
)

// ErrExists indicates that a duplicate record was inserted. This should never
// happen for normal usage of batchskl as every key should have a unique
// sequence number.
var ErrExists = errors.New("record with this key already exists")
var (
// ErrExists indicates that a duplicate record was inserted. This should never
// happen for normal usage of batchskl as every key should have a unique
// sequence number.
ErrExists = errors.New("record with this key already exists")

// ErrTooManyRecords is a sentinel error returned when the size of the raw
// nodes slice exceeds the maximum allowed size (currently 1 << 32 - 1). This
// corresponds to ~117 M skiplist entries.
ErrTooManyRecords = errors.New("too many records")
)

type links struct {
next uint32
Expand Down Expand Up @@ -171,9 +179,17 @@ func (s *Skiplist) Init(storage *[]byte, cmp base.Compare, abbreviatedKey base.A
s.nodes = make([]byte, 0, initBufSize)
}

// Allocate head and tail nodes.
s.head = s.newNode(maxHeight, 0, 0, 0, 0)
s.tail = s.newNode(maxHeight, 0, 0, 0, 0)
// Allocate head and tail nodes. While allocating a new node can fail, in the
// context of initializing the skiplist we consider it unrecoverable.
var err error
s.head, err = s.newNode(maxHeight, 0, 0, 0, 0)
if err != nil {
panic(err)
}
s.tail, err = s.newNode(maxHeight, 0, 0, 0, 0)
if err != nil {
panic(err)
}

// Link all head/tail levels together.
headNode := s.node(s.head)
Expand Down Expand Up @@ -230,7 +246,10 @@ func (s *Skiplist) Add(keyOffset uint32) error {
// We always insert from the base level and up. After you add a node in base
// level, we cannot create a node in the level above because it would have
// discovered the node in the base level.
nd := s.newNode(height, keyOffset, keyStart, keyEnd, abbreviatedKey)
nd, err := s.newNode(height, keyOffset, keyStart, keyEnd, abbreviatedKey)
if err != nil {
return err
}
newNode := s.node(nd)
for level := uint32(0); level < height; level++ {
next := spl[level].next
Expand All @@ -255,23 +274,26 @@ func (s *Skiplist) NewIter(lower, upper []byte) Iterator {
}

func (s *Skiplist) newNode(height,
offset, keyStart, keyEnd uint32, abbreviatedKey uint64) uint32 {
offset, keyStart, keyEnd uint32, abbreviatedKey uint64) (uint32, error) {
if height < 1 || height > maxHeight {
panic("height cannot be less than one or greater than the max height")
}

unusedSize := (maxHeight - int(height)) * linksSize
nodeOffset := s.alloc(uint32(maxNodeSize - unusedSize))
nodeOffset, err := s.alloc(uint32(maxNodeSize - unusedSize))
if err != nil {
return 0, err
}
nd := s.node(nodeOffset)

nd.offset = offset
nd.keyStart = keyStart
nd.keyEnd = keyEnd
nd.abbreviatedKey = abbreviatedKey
return nodeOffset
return nodeOffset, nil
}

func (s *Skiplist) alloc(size uint32) uint32 {
func (s *Skiplist) alloc(size uint32) (uint32, error) {
offset := len(s.nodes)

// We only have a need for memory up to offset + size, but we never want
Expand All @@ -282,14 +304,27 @@ func (s *Skiplist) alloc(size uint32) uint32 {
if allocSize < minAllocSize {
allocSize = minAllocSize
}
// Cap the allocation at the max allowed size to avoid wasted capacity.
if allocSize > maxNodesSize {
// The new record may still not fit within the allocation, in which case
// we return early with an error. This avoids the panic below when we
// resize the slice. It also avoids the allocation and copy.
if uint64(offset)+uint64(size) > maxNodesSize {
return 0, errors.Wrapf(ErrTooManyRecords,
"alloc of new record (size=%d) would overflow uint32 (current size=%d)",
uint64(offset)+uint64(size), offset,
)
}
allocSize = maxNodesSize
}
tmp := make([]byte, len(s.nodes), allocSize)
copy(tmp, s.nodes)
s.nodes = tmp
}

newSize := uint32(offset) + size
s.nodes = s.nodes[:newSize]
return uint32(offset)
return uint32(offset), nil
}

func (s *Skiplist) node(offset uint32) *node {
Expand Down
25 changes: 23 additions & 2 deletions internal/batchskl/skl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -215,6 +216,24 @@ func TestSkiplistAdd(t *testing.T) {
require.Equal(t, 6, lengthRev(l))
}

func TestSkiplistAdd_Overflow(t *testing.T) {
// Regression test for cockroachdb/pebble#1258. The length of the nodes buffer
// cannot exceed the maximum allowable size.
d := &testStorage{}
l := newTestSkiplist(d)

// Simulate a full nodes slice. This speeds up the test significantly, as
// opposed to adding data to the list.
l.nodes = make([]byte, maxNodesSize)

// Adding a new node to the list would overflow the nodes slice. Note that it
// is the size of a new node struct that is relevant here, rather than the
// size of the data being added to the list.
err := l.Add(d.add("too much!"))
require.Error(t, err)
require.True(t, errors.Is(err, ErrTooManyRecords))
}

// TestIteratorNext tests a basic iteration over all nodes from the beginning.
func TestIteratorNext(t *testing.T) {
const n = 100
Expand Down Expand Up @@ -480,7 +499,8 @@ func BenchmarkIterNext(b *testing.B) {
for len(d.data)+20 < cap(d.data) {
key := randomKey(rng, buf[:])
offset := d.addBytes(key)
_ = l.Add(offset)
err := l.Add(offset)
require.NoError(b, err)
}

it := l.NewIter(nil, nil)
Expand All @@ -504,7 +524,8 @@ func BenchmarkIterPrev(b *testing.B) {
for len(d.data)+20 < cap(d.data) {
key := randomKey(rng, buf[:])
offset := d.addBytes(key)
_ = l.Add(offset)
err := l.Add(offset)
require.NoError(b, err)
}

it := l.NewIter(nil, nil)
Expand Down

0 comments on commit 38b68e1

Please sign in to comment.