Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add internal iterator to Bucket that goes over buckets. #356

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,9 @@ func (b *Bucket) DeleteBucket(key []byte) error {

// Recursively delete all child buckets.
child := b.Bucket(key)
err := child.ForEach(func(k, v []byte) error {
if _, _, childFlags := child.Cursor().seek(k); (childFlags & bucketLeafFlag) != 0 {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
}
err := child.ForEachBucket(func(k []byte) error {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
}
return nil
})
Expand Down Expand Up @@ -394,7 +392,22 @@ func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
return nil
}

// Stat returns stats on a bucket.
func (b *Bucket) ForEachBucket(fn func(k []byte) error) error {
if b.tx.db == nil {
return ErrTxClosed
}
c := b.Cursor()
for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
if flags&bucketLeafFlag != 0 {
if err := fn(k); err != nil {
return err
}
}
}
return nil
}

// Stats returns stats on a bucket.
func (b *Bucket) Stats() BucketStats {
var s, subStats BucketStats
pageSize := b.tx.db.pageSize
Expand Down Expand Up @@ -461,7 +474,7 @@ func (b *Bucket) Stats() BucketStats {

// Keep track of maximum page depth.
if depth+1 > s.Depth {
s.Depth = (depth + 1)
s.Depth = depth + 1
}
})

Expand Down
167 changes: 122 additions & 45 deletions bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"log"
"math/rand"
"os"
Expand All @@ -13,7 +15,6 @@ import (
"testing"
"testing/quick"

"github.com/stretchr/testify/assert"
bolt "go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -1005,57 +1006,133 @@ func TestBucket_ForEach(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

if err := db.Update(func(tx *bolt.Tx) error {
type kv struct {
k []byte
v []byte
}

expectedItems := []kv{
{k: []byte("bar"), v: []byte("0002")},
{k: []byte("baz"), v: []byte("0001")},
{k: []byte("csubbucket"), v: nil},
{k: []byte("foo"), v: []byte("0000")},
}

verifyReads := func(b *bolt.Bucket) {
var items []kv
err := b.ForEach(func(k, v []byte) error {
items = append(items, kv{k: k, v: v})
return nil
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
}

err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
if err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("foo"), []byte("0000")); err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("baz"), []byte("0001")); err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("bar"), []byte("0002")); err != nil {
t.Fatal(err)
}
require.NoError(t, err, "bucket creation failed")

var index int
if err := b.ForEach(func(k, v []byte) error {
switch index {
case 0:
if !bytes.Equal(k, []byte("bar")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0002")) {
t.Fatalf("unexpected value: %v", v)
}
case 1:
if !bytes.Equal(k, []byte("baz")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0001")) {
t.Fatalf("unexpected value: %v", v)
}
case 2:
if !bytes.Equal(k, []byte("foo")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0000")) {
t.Fatalf("unexpected value: %v", v)
}
}
index++
require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
_, err = b.CreateBucket([]byte("csubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
})
require.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

func TestBucket_ForEachBucket(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

expectedItems := [][]byte{
[]byte("csubbucket"),
[]byte("zsubbucket"),
}

verifyReads := func(b *bolt.Bucket) {
var items [][]byte
err := b.ForEachBucket(func(k []byte) error {
items = append(items, k)
return nil
}); err != nil {
t.Fatal(err)
}
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
}

if index != 3 {
t.Fatalf("unexpected index: %d", index)
}
err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
require.NoError(t, err, "bucket creation failed")

require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
_, err = b.CreateBucket([]byte("zsubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
_, err = b.CreateBucket([]byte("csubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
}); err != nil {
t.Fatal(err)
})
assert.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

func TestBucket_ForEachBucket_NoBuckets(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()

verifyReads := func(b *bolt.Bucket) {
var items [][]byte
err := b.ForEachBucket(func(k []byte) error {
items = append(items, k)
return nil
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Emptyf(t, items, "what we iterated (ForEach) is not what we put")
}

err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
require.NoError(t, err, "bucket creation failed")

require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
require.NoErrorf(t, err, "creation of subbucket failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, err, "creation of subbucket failed")

verifyReads(b)

return nil
})
require.NoErrorf(t, err, "db.Update failed")

err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}

// Ensure a database can stop iteration early.
Expand Down
19 changes: 13 additions & 6 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,18 @@ func (c *Cursor) Bucket() *Bucket {
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed")
k, v, flags := c.first()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}

func (c *Cursor) first() (key []byte, value []byte, flags uint32) {
c.stack = c.stack[:0]
p, n := c.bucket.pageNode(c.bucket.root)
c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
c.first()
c.goToFirstElementOnTheStack()

// If we land on an empty page then move to the next value.
// https://github.com/boltdb/bolt/issues/450
Expand All @@ -43,10 +51,9 @@ func (c *Cursor) First() (key []byte, value []byte) {

k, v, flags := c.keyValue()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
return k, nil, flags
}
return k, v

return k, v, flags
}

// Last moves the cursor to the last item in the bucket and returns its key and value.
Expand Down Expand Up @@ -155,7 +162,7 @@ func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
}

// first moves the cursor to the first leaf element under the last page in the stack.
func (c *Cursor) first() {
func (c *Cursor) goToFirstElementOnTheStack() {
for {
// Exit when we hit a leaf page.
var ref = &c.stack[len(c.stack)-1]
Expand Down Expand Up @@ -223,7 +230,7 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
// Otherwise start from where we left off in the stack and find the
// first element of the first leaf page.
c.stack = c.stack[:i+1]
c.first()
c.goToFirstElementOnTheStack()

// If this is an empty page then restart and move back up the stack.
// https://github.com/boltdb/bolt/issues/450
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/stretchr/testify v1.8.1
golang.org/x/sys v0.2.0
golang.org/x/sys v0.3.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
2 changes: 1 addition & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo
})

// Check each bucket within this bucket.
_ = b.ForEach(func(k, v []byte) error {
_ = b.ForEachBucket(func(k []byte) error {
if child := b.Bucket(k); child != nil {
tx.checkBucket(child, reachable, freed, ch)
}
Expand Down