Skip to content

Commit

Permalink
Add internal iterator to Bucket that goes over buckets.
Browse files Browse the repository at this point in the history
So far the code was frequently traversing all the keys (ignoring flag whether key is a bucket)
and trying to open each of the keys as bucket (seeking the same entry from the scratch).

In this proposal, we iterate only through bucket keys.

Signed-off-by: Piotr Tabor <ptab@google.com>
  • Loading branch information
ptabor committed Dec 17, 2022
1 parent 1d5a2b0 commit 7e057ea
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 58 deletions.
27 changes: 20 additions & 7 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,9 @@ func (b *Bucket) DeleteBucket(key []byte) error {

// Recursively delete all child buckets.
child := b.Bucket(key)
err := child.ForEach(func(k, v []byte) error {
if _, _, childFlags := child.Cursor().seek(k); (childFlags & bucketLeafFlag) != 0 {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
}
err := child.ForEachBucket(func(k []byte) error {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
}
return nil
})
Expand Down Expand Up @@ -394,7 +392,22 @@ func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
return nil
}

// Stat returns stats on a bucket.
func (b *Bucket) ForEachBucket(fn func(k []byte) error) error {
if b.tx.db == nil {
return ErrTxClosed
}
c := b.Cursor()
for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
if flags&bucketLeafFlag != 0 {
if err := fn(k); err != nil {
return err
}
}
}
return nil
}

// Stats returns stats on a bucket.
func (b *Bucket) Stats() BucketStats {
var s, subStats BucketStats
pageSize := b.tx.db.pageSize
Expand Down Expand Up @@ -461,7 +474,7 @@ func (b *Bucket) Stats() BucketStats {

// Keep track of maximum page depth.
if depth+1 > s.Depth {
s.Depth = (depth + 1)
s.Depth = depth + 1
}
})

Expand Down
165 changes: 121 additions & 44 deletions bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/stretchr/testify/assert"
"log"
"math/rand"
"os"
Expand Down Expand Up @@ -1003,57 +1004,133 @@ func TestBucket_ForEach(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

if err := db.Update(func(tx *bolt.Tx) error {
type kv struct {
k []byte
v []byte
}

expectedItems := []kv{
{k: []byte("bar"), v: []byte("0002")},
{k: []byte("baz"), v: []byte("0001")},
{k: []byte("csubbucket"), v: nil},
{k: []byte("foo"), v: []byte("0000")},
}

verifyReads := func(b *bolt.Bucket) {
var items []kv
err := b.ForEach(func(k, v []byte) error {
items = append(items, kv{k: k, v: v})
return nil
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
}

err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
if err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("foo"), []byte("0000")); err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("baz"), []byte("0001")); err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("bar"), []byte("0002")); err != nil {
t.Fatal(err)
}
assert.NoError(t, err, "bucket creation failed")

var index int
if err := b.ForEach(func(k, v []byte) error {
switch index {
case 0:
if !bytes.Equal(k, []byte("bar")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0002")) {
t.Fatalf("unexpected value: %v", v)
}
case 1:
if !bytes.Equal(k, []byte("baz")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0001")) {
t.Fatalf("unexpected value: %v", v)
}
case 2:
if !bytes.Equal(k, []byte("foo")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0000")) {
t.Fatalf("unexpected value: %v", v)
}
}
index++
assert.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
assert.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
assert.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
_, err = b.CreateBucket([]byte("csubbucket"))
assert.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
})
assert.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
assert.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

func TestBucket_ForEachBucket(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

expectedItems := [][]byte{
[]byte("csubbucket"),
[]byte("zsubbucket"),
}

verifyReads := func(b *bolt.Bucket) {
var items [][]byte
err := b.ForEachBucket(func(k []byte) error {
items = append(items, k)
return nil
}); err != nil {
t.Fatal(err)
}
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
}

if index != 3 {
t.Fatalf("unexpected index: %d", index)
}
err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
assert.NoError(t, err, "bucket creation failed")

assert.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
_, err = b.CreateBucket([]byte("zsubbucket"))
assert.NoErrorf(t, err, "creation of subbucket failed")
assert.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
assert.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
_, err = b.CreateBucket([]byte("csubbucket"))
assert.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
}); err != nil {
t.Fatal(err)
})
assert.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
assert.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

func TestBucket_ForEachBucket_NoBuckets(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

verifyReads := func(b *bolt.Bucket) {
var items [][]byte
err := b.ForEachBucket(func(k []byte) error {
items = append(items, k)
return nil
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Emptyf(t, items, "what we iterated (ForEach) is not what we put")
}

err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
assert.NoError(t, err, "bucket creation failed")

assert.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
assert.NoErrorf(t, err, "creation of subbucket failed")
assert.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
assert.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
})
assert.NoErrorf(t, err, "db.Update failed")

err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
assert.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

// Ensure a database can stop iteration early.
Expand Down
19 changes: 13 additions & 6 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,19 @@ func (c *Cursor) Bucket() *Bucket {
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte) {
k, v, flags := c.first()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}

func (c *Cursor) first() (key []byte, value []byte, flags uint32) {
_assert(c.bucket.tx.db != nil, "tx closed")
c.stack = c.stack[:0]
p, n := c.bucket.pageNode(c.bucket.root)
c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
c.first()
c.goToFirstElementOnTheStack()

// If we land on an empty page then move to the next value.
// https://github.com/boltdb/bolt/issues/450
Expand All @@ -43,10 +51,9 @@ func (c *Cursor) First() (key []byte, value []byte) {

k, v, flags := c.keyValue()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
return k, nil, flags
}
return k, v

return k, v, flags
}

// Last moves the cursor to the last item in the bucket and returns its key and value.
Expand Down Expand Up @@ -155,7 +162,7 @@ func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
}

// first moves the cursor to the first leaf element under the last page in the stack.
func (c *Cursor) first() {
func (c *Cursor) goToFirstElementOnTheStack() {
for {
// Exit when we hit a leaf page.
var ref = &c.stack[len(c.stack)-1]
Expand Down Expand Up @@ -223,7 +230,7 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
// Otherwise start from where we left off in the stack and find the
// first element of the first leaf page.
c.stack = c.stack[:i+1]
c.first()
c.goToFirstElementOnTheStack()

// If this is an empty page then restart and move back up the stack.
// https://github.com/boltdb/bolt/issues/450
Expand Down
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@ module go.etcd.io/bbolt
go 1.17

require golang.org/x/sys v0.2.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo
})

// Check each bucket within this bucket.
_ = b.ForEach(func(k, v []byte) error {
_ = b.ForEachBucket(func(k []byte) error {
if child := b.Bucket(k); child != nil {
tx.checkBucket(child, reachable, freed, ch)
}
Expand Down

0 comments on commit 7e057ea

Please sign in to comment.