Skip to content

Commit

Permalink
Add internal iterator to Bucket that goes over buckets.
Browse files Browse the repository at this point in the history
So far the code was frequently traversing all the keys (ignoring flag whether key is a bucket)
and trying to open each of the keys as bucket (seeking the same entry from the scratch).

In this proposal, we iterate only through bucket keys.

Signed-off-by: Piotr Tabor <ptab@google.com>
  • Loading branch information
ptabor committed Dec 21, 2022
1 parent fa80cee commit ebca452
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 62 deletions.
27 changes: 20 additions & 7 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,9 @@ func (b *Bucket) DeleteBucket(key []byte) error {

// Recursively delete all child buckets.
child := b.Bucket(key)
err := child.ForEach(func(k, v []byte) error {
if _, _, childFlags := child.Cursor().seek(k); (childFlags & bucketLeafFlag) != 0 {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
}
err := child.ForEachBucket(func(k []byte) error {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
}
return nil
})
Expand Down Expand Up @@ -394,7 +392,22 @@ func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
return nil
}

// Stat returns stats on a bucket.
func (b *Bucket) ForEachBucket(fn func(k []byte) error) error {
if b.tx.db == nil {
return ErrTxClosed
}
c := b.Cursor()
for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
if flags&bucketLeafFlag != 0 {
if err := fn(k); err != nil {
return err
}
}
}
return nil
}

// Stats returns stats on a bucket.
func (b *Bucket) Stats() BucketStats {
var s, subStats BucketStats
pageSize := b.tx.db.pageSize
Expand Down Expand Up @@ -461,7 +474,7 @@ func (b *Bucket) Stats() BucketStats {

// Keep track of maximum page depth.
if depth+1 > s.Depth {
s.Depth = (depth + 1)
s.Depth = depth + 1
}
})

Expand Down
167 changes: 122 additions & 45 deletions bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"log"
"math/rand"
"os"
Expand All @@ -13,7 +15,6 @@ import (
"testing"
"testing/quick"

"github.com/stretchr/testify/assert"
bolt "go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -1005,57 +1006,133 @@ func TestBucket_ForEach(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

if err := db.Update(func(tx *bolt.Tx) error {
type kv struct {
k []byte
v []byte
}

expectedItems := []kv{
{k: []byte("bar"), v: []byte("0002")},
{k: []byte("baz"), v: []byte("0001")},
{k: []byte("csubbucket"), v: nil},
{k: []byte("foo"), v: []byte("0000")},
}

verifyReads := func(b *bolt.Bucket) {
var items []kv
err := b.ForEach(func(k, v []byte) error {
items = append(items, kv{k: k, v: v})
return nil
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
}

err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
if err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("foo"), []byte("0000")); err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("baz"), []byte("0001")); err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("bar"), []byte("0002")); err != nil {
t.Fatal(err)
}
require.NoError(t, err, "bucket creation failed")

var index int
if err := b.ForEach(func(k, v []byte) error {
switch index {
case 0:
if !bytes.Equal(k, []byte("bar")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0002")) {
t.Fatalf("unexpected value: %v", v)
}
case 1:
if !bytes.Equal(k, []byte("baz")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0001")) {
t.Fatalf("unexpected value: %v", v)
}
case 2:
if !bytes.Equal(k, []byte("foo")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0000")) {
t.Fatalf("unexpected value: %v", v)
}
}
index++
require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
_, err = b.CreateBucket([]byte("csubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
})
require.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

func TestBucket_ForEachBucket(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

expectedItems := [][]byte{
[]byte("csubbucket"),
[]byte("zsubbucket"),
}

verifyReads := func(b *bolt.Bucket) {
var items [][]byte
err := b.ForEachBucket(func(k []byte) error {
items = append(items, k)
return nil
}); err != nil {
t.Fatal(err)
}
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
}

if index != 3 {
t.Fatalf("unexpected index: %d", index)
}
err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
require.NoError(t, err, "bucket creation failed")

require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
_, err = b.CreateBucket([]byte("zsubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
_, err = b.CreateBucket([]byte("csubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
}); err != nil {
t.Fatal(err)
})
assert.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

func TestBucket_ForEachBucket_NoBuckets(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

verifyReads := func(b *bolt.Bucket) {
var items [][]byte
err := b.ForEachBucket(func(k []byte) error {
items = append(items, k)
return nil
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Emptyf(t, items, "what we iterated (ForEach) is not what we put")
}

err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
require.NoError(t, err, "bucket creation failed")

require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
require.NoErrorf(t, err, "creation of subbucket failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
})
require.NoErrorf(t, err, "db.Update failed")

err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

// Ensure a database can stop iteration early.
Expand Down
19 changes: 13 additions & 6 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,18 @@ func (c *Cursor) Bucket() *Bucket {
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed")
k, v, flags := c.first()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}

func (c *Cursor) first() (key []byte, value []byte, flags uint32) {
c.stack = c.stack[:0]
p, n := c.bucket.pageNode(c.bucket.root)
c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
c.first()
c.goToFirstElementOnTheStack()

// If we land on an empty page then move to the next value.
// https://github.com/boltdb/bolt/issues/450
Expand All @@ -43,10 +51,9 @@ func (c *Cursor) First() (key []byte, value []byte) {

k, v, flags := c.keyValue()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
return k, nil, flags
}
return k, v

return k, v, flags
}

// Last moves the cursor to the last item in the bucket and returns its key and value.
Expand Down Expand Up @@ -155,7 +162,7 @@ func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
}

// first moves the cursor to the first leaf element under the last page in the stack.
func (c *Cursor) first() {
func (c *Cursor) goToFirstElementOnTheStack() {
for {
// Exit when we hit a leaf page.
var ref = &c.stack[len(c.stack)-1]
Expand Down Expand Up @@ -223,7 +230,7 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
// Otherwise start from where we left off in the stack and find the
// first element of the first leaf page.
c.stack = c.stack[:i+1]
c.first()
c.goToFirstElementOnTheStack()

// If this is an empty page then restart and move back up the stack.
// https://github.com/boltdb/bolt/issues/450
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/stretchr/testify v1.8.1
golang.org/x/sys v0.2.0
golang.org/x/sys v0.3.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
2 changes: 1 addition & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo
})

// Check each bucket within this bucket.
_ = b.ForEach(func(k, v []byte) error {
_ = b.ForEachBucket(func(k []byte) error {
if child := b.Bucket(k); child != nil {
tx.checkBucket(child, reachable, freed, ch)
}
Expand Down

0 comments on commit ebca452

Please sign in to comment.